Hi, all. Currently, we use two seperated interfaces
SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design. The interface SupportsWatermarkPushDown relies on SupportsComputedColumnPushDown when watermark is defined on a computed column. During the implementation, we find the method in SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider and the duplication of SupportsComputedColumnPushDown and SupportsProjectionPushDown. Therefore, we decide to propose a new interface of SupportsWatermarkPushDown to solve the problems we mentioned. *Problems of SupportsComputedColumnPushDown and SupportsWatermarkPushDown*Problems of SupportsWatermarkPushDown SupportsWatermarkPushDown uses an inner interface named WatermarkProvider to register WatermarkGenerator into DynamicTableSource now. However, the community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to create watermark generators in FLIP-126. WatermarkStrategy is a factory of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it is used to generate deprecated AssignerWithPeriodicWatermarks and PunctuatedWatermarkAssignerProvider. Therefore, we think it's not suitable to use the WatermarkProvider any more. Problems of SupportsComputedColumnPushDown There are two problems around when using SupportsComputedColumnPushDown alone. First, planner will transform the computed column and query such as select a+b to a LogicalProject. When it comes to the optimization phase, we have no means to distinguish whether the Rexnode in the projection is from computed columns or query. So SupportsComputedColumnPushDown in reality will push not only the computed column but also the calculation in the query. Second, when a plan matches the rule PushComputedColumnIntoTableSourceScanRule, we have to build a new RowData to place all fields we require. However, both two rules PushComputedColumnIntoTableSourceScanRule and PushProjectIntoTableSourceScanRule will do the same work that prune the records that read from source. It seems that we have two duplicate rules in planner. But I think we should use the rule PushProjectIntoTableSourceScanRule rather than PushComputedColumnIntoTableSourceScanRule if we don't support watermark push down. Compared to PushComputedColumnIntoTableSourceScanRule, PushProjectIntoTableSourceScanRule is much lighter and we can read pruned data from source rather than use a map function in flink. Therefore, we think it's not a good idea to use two interfaces rather than one. *New Proposal* First of all, let us address some background when pushing watermarks into table source scan. There are two structures that we need to consider. We list two examples below for discussion. structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])structure 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, 5000:INTERVAL SECOND)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) As we can see, structure 2 is much more complicated than structure 1. For structure 1, we can use the row from table scan to generate watermarks directly. But for structure 2, we need to calculate the rowtime expression in LogicalProject and use the result of calculation to generate watermarks. Considering that WatermarkStrategy has the ability to extract timestamp from row, we have a proposal to push only WatermarkStrategy to scan. Push WatermarkStrategy to Scan In this interface, we will only push WatermarkStrategy to DynamicTableSource . public interface SupportsWatermarkPushDown { void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy); } The advantage of the new api is that it's very simple for the developers of Connector. They only need to take WatermarkStrategy into consideration and don't need to deal with other infos such as ComputedColumnConverter in SupportsComputedColumnPushDown. But it also has one disadvantage that it needs to calculate the rowtime expression again in LogicalProjection because we don't build a new row in scan to store the calculated timestamp. However, we can replace the calculation of rowtime in LogicalProjection with a reference to eliminate duplicate calculation, which will use the StreamRecord's getter to read the timestamp that is calculated before. But this optimization still has one limitation that it relies on computed columns are not defined on other computed columns. For nested computed columns, we have no place to save the intermediate result. But we still have a problem that when we push an udf into the source, we need a context as powerful as FunctionContext to open the udf. But the current WatermarkGeneratorSupplier.Context only supports method getMetricGroup and misses methods getCachedFile, getJobParameter and g etExternalResourceInfos, which means we can't convert the WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering that the udf is only used to generate watermark, we suggest to throw UnsupportedException when invoking the methods exist in FunctionContext but don't exist in WatermarkGeneratorSupplier.Context. But we have to admit that there are risks in doing so because we have no promise that the udf will not invoke these methods. *Summary* We have addressed the whole problem and solution in detail. As a conclusion, I think the new interface avoids the problems we mentioned before. It has a clear definition as its name tells and has nothing in common with SupportProjectionPushDown. As for its disadvantage, I think it's acceptable to calculate the rowtime column twice and we also have a plan to improve its efficiency as a follow up if it brings some performance problems. |
Hi, all. It seems the format is not normal. So I add a google doc in
link[1]. This discussion is about the interfaces in FLIP-95: New Table Source And Table Sink and propose to merge two interfaces SupportsWatermarkPushDown and SupportsComputedColumnPushDown. I am looking forward to any opinions and suggestions from the community. Regards, Shengkai [1] https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit# Shengkai Fang <[hidden email]> 于2020年9月4日周五 下午2:58写道: > Hi, all. Currently, we use two seperated interfaces > SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design. The > interface SupportsWatermarkPushDown relies on > SupportsComputedColumnPushDown when watermark is defined on a computed > column. During the implementation, we find the method in > SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider > and the duplication of SupportsComputedColumnPushDown and > SupportsProjectionPushDown. Therefore, we decide to propose a new > interface of SupportsWatermarkPushDown to solve the problems we mentioned. > > > *Problems of SupportsComputedColumnPushDown and SupportsWatermarkPushDown*Problems > of SupportsWatermarkPushDown > > SupportsWatermarkPushDown uses an inner interface named WatermarkProvider to > register WatermarkGenerator into DynamicTableSource now. However, the > community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to > create watermark generators in FLIP-126. WatermarkStrategy is a factory > of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses > the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate > Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it > is used to generate deprecated AssignerWithPeriodicWatermarks and > PunctuatedWatermarkAssignerProvider. Therefore, we think it's not > suitable to use the WatermarkProvider any more. > > > Problems of SupportsComputedColumnPushDown > > There are two problems around when using SupportsComputedColumnPushDown > alone. > > First, planner will transform the computed column and query such as select > a+b to a LogicalProject. When it comes to the optimization phase, we have > no means to distinguish whether the Rexnode in the projection is from > computed columns or query. So SupportsComputedColumnPushDown in reality > will push not only the computed column but also the calculation in the > query. > > Second, when a plan matches the rule > PushComputedColumnIntoTableSourceScanRule, we have to build a new RowData to > place all fields we require. However, both two rules > PushComputedColumnIntoTableSourceScanRule and > PushProjectIntoTableSourceScanRule will do the same work that prune the > records that read from source. It seems that we have two duplicate rules in > planner. But I think we should use the rule > PushProjectIntoTableSourceScanRule rather than > PushComputedColumnIntoTableSourceScanRule if we don't support watermark > push down. Compared to PushComputedColumnIntoTableSourceScanRule, > PushProjectIntoTableSourceScanRule is much lighter and we can read pruned > data from source rather than use a map function in flink. > > Therefore, we think it's not a good idea to use two interfaces rather than > one. > > > *New Proposal* > > First of all, let us address some background when pushing watermarks into > table source scan. There are two structures that we need to consider. We > list two examples below for discussion. > > structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])structure 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, 5000:INTERVAL SECOND)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) > > As we can see, structure 2 is much more complicated than structure 1. For > structure 1, we can use the row from table scan to generate watermarks > directly. But for structure 2, we need to calculate the rowtime expression > in LogicalProject and use the result of calculation to generate > watermarks. Considering that WatermarkStrategy has the ability to extract > timestamp from row, we have a proposal to push only WatermarkStrategy to > scan. > > > Push WatermarkStrategy to Scan > > In this interface, we will only push WatermarkStrategy to > DynamicTableSource. > > public interface SupportsWatermarkPushDown { void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy); } > > The advantage of the new api is that it's very simple for the developers > of Connector. They only need to take WatermarkStrategy into consideration > and don't need to deal with other infos such as ComputedColumnConverter > in SupportsComputedColumnPushDown. But it also has one disadvantage that > it needs to calculate the rowtime expression again in LogicalProjection > because we don't build a new row in scan to store the calculated timestamp. > However, we can replace the calculation of rowtime in LogicalProjection > with a reference to eliminate duplicate calculation, which will use the > StreamRecord's getter to read the timestamp that is calculated before. But > this optimization still has one limitation that it relies on computed > columns are not defined on other computed columns. For nested computed > columns, we have no place to save the intermediate result. > > But we still have a problem that when we push an udf into the source, we > need a context as powerful as FunctionContext to open the udf. But the > current WatermarkGeneratorSupplier.Context only supports method > getMetricGroup and misses methods getCachedFile, getJobParameter and g > etExternalResourceInfos, which means we can't convert the > WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering > that the udf is only used to generate watermark, we suggest to throw > UnsupportedException when invoking the methods exist in FunctionContext > but don't exist in WatermarkGeneratorSupplier.Context. But we have to > admit that there are risks in doing so because we have no promise that the > udf will not invoke these methods. > > > *Summary* > > We have addressed the whole problem and solution in detail. As a > conclusion, I think the new interface avoids the problems we mentioned > before. It has a clear definition as its name tells and has nothing in > common with SupportProjectionPushDown. As for its disadvantage, I think > it's acceptable to calculate the rowtime column twice and we also have a > plan to improve its efficiency as a follow up if it brings some performance > problems. > > > |
Thanks to Shengkai for summarizing the problems on the FLIP-95 interfaces
and solutions. I think the new proposal, i.e. only pushing the "WatermarkStrategy" is much cleaner and easier to develop than before. So I'm +1 to the proposal. Best, Jark On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <[hidden email]> wrote: > Hi, all. It seems the format is not normal. So I add a google doc in > link[1]. This discussion is about the interfaces in FLIP-95: New Table > Source And Table Sink and propose to merge two interfaces > SupportsWatermarkPushDown and SupportsComputedColumnPushDown. > > I am looking forward to any opinions and suggestions from the community. > > Regards, > Shengkai > > [1] > > https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit# > > Shengkai Fang <[hidden email]> 于2020年9月4日周五 下午2:58写道: > > > Hi, all. Currently, we use two seperated interfaces > > SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design. > The > > interface SupportsWatermarkPushDown relies on > > SupportsComputedColumnPushDown when watermark is defined on a computed > > column. During the implementation, we find the method in > > SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider > > and the duplication of SupportsComputedColumnPushDown and > > SupportsProjectionPushDown. Therefore, we decide to propose a new > > interface of SupportsWatermarkPushDown to solve the problems we > mentioned. > > > > > > *Problems of SupportsComputedColumnPushDown and > SupportsWatermarkPushDown*Problems > > of SupportsWatermarkPushDown > > > > SupportsWatermarkPushDown uses an inner interface named > WatermarkProvider to > > register WatermarkGenerator into DynamicTableSource now. However, the > > community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to > > create watermark generators in FLIP-126. WatermarkStrategy is a factory > > of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses > > the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate > > Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it > > is used to generate deprecated AssignerWithPeriodicWatermarks and > > PunctuatedWatermarkAssignerProvider. Therefore, we think it's not > > suitable to use the WatermarkProvider any more. > > > > > > Problems of SupportsComputedColumnPushDown > > > > There are two problems around when using SupportsComputedColumnPushDown > > alone. > > > > First, planner will transform the computed column and query such as > select > > a+b to a LogicalProject. When it comes to the optimization phase, we have > > no means to distinguish whether the Rexnode in the projection is from > > computed columns or query. So SupportsComputedColumnPushDown in reality > > will push not only the computed column but also the calculation in the > > query. > > > > Second, when a plan matches the rule > > PushComputedColumnIntoTableSourceScanRule, we have to build a new > RowData to > > place all fields we require. However, both two rules > > PushComputedColumnIntoTableSourceScanRule and > > PushProjectIntoTableSourceScanRule will do the same work that prune the > > records that read from source. It seems that we have two duplicate rules > in > > planner. But I think we should use the rule > > PushProjectIntoTableSourceScanRule rather than > > PushComputedColumnIntoTableSourceScanRule if we don't support watermark > > push down. Compared to PushComputedColumnIntoTableSourceScanRule, > > PushProjectIntoTableSourceScanRule is much lighter and we can read pruned > > data from source rather than use a map function in flink. > > > > Therefore, we think it's not a good idea to use two interfaces rather > than > > one. > > > > > > *New Proposal* > > > > First of all, let us address some background when pushing watermarks into > > table source scan. There are two structures that we need to consider. We > > list two examples below for discussion. > > > > structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, > 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, > default_database, MyTable]])structure > 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL > SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, 5000:INTERVAL > SECOND)]) +- LogicalTableScan(table=[[default_catalog, default_database, > MyTable]]) > > > > As we can see, structure 2 is much more complicated than structure 1. For > > structure 1, we can use the row from table scan to generate watermarks > > directly. But for structure 2, we need to calculate the rowtime > expression > > in LogicalProject and use the result of calculation to generate > > watermarks. Considering that WatermarkStrategy has the ability to extract > > timestamp from row, we have a proposal to push only WatermarkStrategy to > > scan. > > > > > > Push WatermarkStrategy to Scan > > > > In this interface, we will only push WatermarkStrategy to > > DynamicTableSource. > > > > public interface SupportsWatermarkPushDown { void > applyWatermark(WatermarkStrategy<RowData> watermarkStrategy); } > > > > The advantage of the new api is that it's very simple for the developers > > of Connector. They only need to take WatermarkStrategy into consideration > > and don't need to deal with other infos such as ComputedColumnConverter > > in SupportsComputedColumnPushDown. But it also has one disadvantage that > > it needs to calculate the rowtime expression again in LogicalProjection > > because we don't build a new row in scan to store the calculated > timestamp. > > However, we can replace the calculation of rowtime in LogicalProjection > > with a reference to eliminate duplicate calculation, which will use the > > StreamRecord's getter to read the timestamp that is calculated before. > But > > this optimization still has one limitation that it relies on computed > > columns are not defined on other computed columns. For nested computed > > columns, we have no place to save the intermediate result. > > > > But we still have a problem that when we push an udf into the source, we > > need a context as powerful as FunctionContext to open the udf. But the > > current WatermarkGeneratorSupplier.Context only supports method > > getMetricGroup and misses methods getCachedFile, getJobParameter and g > > etExternalResourceInfos, which means we can't convert the > > WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering > > that the udf is only used to generate watermark, we suggest to throw > > UnsupportedException when invoking the methods exist in FunctionContext > > but don't exist in WatermarkGeneratorSupplier.Context. But we have to > > admit that there are risks in doing so because we have no promise that > the > > udf will not invoke these methods. > > > > > > *Summary* > > > > We have addressed the whole problem and solution in detail. As a > > conclusion, I think the new interface avoids the problems we mentioned > > before. It has a clear definition as its name tells and has nothing in > > common with SupportProjectionPushDown. As for its disadvantage, I think > > it's acceptable to calculate the rowtime column twice and we also have a > > plan to improve its efficiency as a follow up if it brings some > performance > > problems. > > > > > > > |
Hi Shengkai,
first of I would not consider the section Problems of SupportsWatermarkPushDown" as a "problem". It was planned to update the WatermarkProvider once the interfaces are ready. See the comment in WatermarkProvider: // marker interface that will be filled after FLIP-126: // WatermarkGenerator<RowData> getWatermarkGenerator(); So far we had no sources that actually implement WatermarkStrategy. Second, for generating watermarks I don't see a problem in merging the two mentioned interfaces SupportsWatermarkPushDown and SupportsComputedColumnPushDown into one. The descibed design sounds reasonable to me and the impact on performance should not be too large. However, by merging these two interfaces we are also merging two completely separate concepts. Computed columns are not always used for generating a rowtime or watermark. Users can and certainly will implement more complex logic in there. One example could be decrypting encrypted records/columns, performing checksum checks, reading metadata etc. So in any case we should still provide two interfaces: SupportsWatermarkPushDown (functionality of computed columns + watermarks) SupportsComputedColumnPushDown (functionality of computed columns only) I'm fine with such a design, but it is also confusing for implementers that SupportsWatermarkPushDown includes the functionality of the other interface. What do you think? Regards, Timo On 08.09.20 04:32, Jark Wu wrote: > Thanks to Shengkai for summarizing the problems on the FLIP-95 interfaces > and solutions. > > I think the new proposal, i.e. only pushing the "WatermarkStrategy" is much > cleaner and easier to develop than before. > So I'm +1 to the proposal. > > Best, > Jark > > On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <[hidden email]> wrote: > >> Hi, all. It seems the format is not normal. So I add a google doc in >> link[1]. This discussion is about the interfaces in FLIP-95: New Table >> Source And Table Sink and propose to merge two interfaces >> SupportsWatermarkPushDown and SupportsComputedColumnPushDown. >> >> I am looking forward to any opinions and suggestions from the community. >> >> Regards, >> Shengkai >> >> [1] >> >> https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit# >> >> Shengkai Fang <[hidden email]> 于2020年9月4日周五 下午2:58写道: >> >>> Hi, all. Currently, we use two seperated interfaces >>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design. >> The >>> interface SupportsWatermarkPushDown relies on >>> SupportsComputedColumnPushDown when watermark is defined on a computed >>> column. During the implementation, we find the method in >>> SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider >>> and the duplication of SupportsComputedColumnPushDown and >>> SupportsProjectionPushDown. Therefore, we decide to propose a new >>> interface of SupportsWatermarkPushDown to solve the problems we >> mentioned. >>> >>> >>> *Problems of SupportsComputedColumnPushDown and >> SupportsWatermarkPushDown*Problems >>> of SupportsWatermarkPushDown >>> >>> SupportsWatermarkPushDown uses an inner interface named >> WatermarkProvider to >>> register WatermarkGenerator into DynamicTableSource now. However, the >>> community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to >>> create watermark generators in FLIP-126. WatermarkStrategy is a factory >>> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses >>> the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate >>> Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it >>> is used to generate deprecated AssignerWithPeriodicWatermarks and >>> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not >>> suitable to use the WatermarkProvider any more. >>> >>> >>> Problems of SupportsComputedColumnPushDown >>> >>> There are two problems around when using SupportsComputedColumnPushDown >>> alone. >>> >>> First, planner will transform the computed column and query such as >> select >>> a+b to a LogicalProject. When it comes to the optimization phase, we have >>> no means to distinguish whether the Rexnode in the projection is from >>> computed columns or query. So SupportsComputedColumnPushDown in reality >>> will push not only the computed column but also the calculation in the >>> query. >>> >>> Second, when a plan matches the rule >>> PushComputedColumnIntoTableSourceScanRule, we have to build a new >> RowData to >>> place all fields we require. However, both two rules >>> PushComputedColumnIntoTableSourceScanRule and >>> PushProjectIntoTableSourceScanRule will do the same work that prune the >>> records that read from source. It seems that we have two duplicate rules >> in >>> planner. But I think we should use the rule >>> PushProjectIntoTableSourceScanRule rather than >>> PushComputedColumnIntoTableSourceScanRule if we don't support watermark >>> push down. Compared to PushComputedColumnIntoTableSourceScanRule, >>> PushProjectIntoTableSourceScanRule is much lighter and we can read pruned >>> data from source rather than use a map function in flink. >>> >>> Therefore, we think it's not a good idea to use two interfaces rather >> than >>> one. >>> >>> >>> *New Proposal* >>> >>> First of all, let us address some background when pushing watermarks into >>> table source scan. There are two structures that we need to consider. We >>> list two examples below for discussion. >>> >>> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, >> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, >> default_database, MyTable]])structure >> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL >> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, 5000:INTERVAL >> SECOND)]) +- LogicalTableScan(table=[[default_catalog, default_database, >> MyTable]]) >>> >>> As we can see, structure 2 is much more complicated than structure 1. For >>> structure 1, we can use the row from table scan to generate watermarks >>> directly. But for structure 2, we need to calculate the rowtime >> expression >>> in LogicalProject and use the result of calculation to generate >>> watermarks. Considering that WatermarkStrategy has the ability to extract >>> timestamp from row, we have a proposal to push only WatermarkStrategy to >>> scan. >>> >>> >>> Push WatermarkStrategy to Scan >>> >>> In this interface, we will only push WatermarkStrategy to >>> DynamicTableSource. >>> >>> public interface SupportsWatermarkPushDown { void >> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy); } >>> >>> The advantage of the new api is that it's very simple for the developers >>> of Connector. They only need to take WatermarkStrategy into consideration >>> and don't need to deal with other infos such as ComputedColumnConverter >>> in SupportsComputedColumnPushDown. But it also has one disadvantage that >>> it needs to calculate the rowtime expression again in LogicalProjection >>> because we don't build a new row in scan to store the calculated >> timestamp. >>> However, we can replace the calculation of rowtime in LogicalProjection >>> with a reference to eliminate duplicate calculation, which will use the >>> StreamRecord's getter to read the timestamp that is calculated before. >> But >>> this optimization still has one limitation that it relies on computed >>> columns are not defined on other computed columns. For nested computed >>> columns, we have no place to save the intermediate result. >>> >>> But we still have a problem that when we push an udf into the source, we >>> need a context as powerful as FunctionContext to open the udf. But the >>> current WatermarkGeneratorSupplier.Context only supports method >>> getMetricGroup and misses methods getCachedFile, getJobParameter and g >>> etExternalResourceInfos, which means we can't convert the >>> WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering >>> that the udf is only used to generate watermark, we suggest to throw >>> UnsupportedException when invoking the methods exist in FunctionContext >>> but don't exist in WatermarkGeneratorSupplier.Context. But we have to >>> admit that there are risks in doing so because we have no promise that >> the >>> udf will not invoke these methods. >>> >>> >>> *Summary* >>> >>> We have addressed the whole problem and solution in detail. As a >>> conclusion, I think the new interface avoids the problems we mentioned >>> before. It has a clear definition as its name tells and has nothing in >>> common with SupportProjectionPushDown. As for its disadvantage, I think >>> it's acceptable to calculate the rowtime column twice and we also have a >>> plan to improve its efficiency as a follow up if it brings some >> performance >>> problems. >>> >>> >>> >> > |
Hi Timo,
Regarding "pushing other computed columns into source, e.g. encrypted records/columns, performing checksum checks, reading metadata etc.", I'm not sure about this. 1. the planner don't know which computed column should be pushed into source 2. it seems that we can't improve performances if we pushdown complex logic into source, we still need to calculate them anyway. 3. the computed column is a regular expression, if the computed column should be pushed down, then shall we push the expressions in the following Projection too? If yes, then the name of "SupportsComputedColumnPushDown" might be not correct. 4. regarding reading metadata, according to FLIP-107, we don't use the existing SupportsComputedColumnPushDown, but a new interface. Therefore, I don't find a strong use case that needs this interface so far. Best, Jark On Tue, 8 Sep 2020 at 17:13, Timo Walther <[hidden email]> wrote: > Hi Shengkai, > > first of I would not consider the section Problems of > SupportsWatermarkPushDown" as a "problem". It was planned to update the > WatermarkProvider once the interfaces are ready. See the comment in > WatermarkProvider: > > // marker interface that will be filled after FLIP-126: > // WatermarkGenerator<RowData> getWatermarkGenerator(); > > So far we had no sources that actually implement WatermarkStrategy. > > Second, for generating watermarks I don't see a problem in merging the > two mentioned interfaces SupportsWatermarkPushDown and > SupportsComputedColumnPushDown into one. The descibed design sounds > reasonable to me and the impact on performance should not be too large. > > However, by merging these two interfaces we are also merging two > completely separate concepts. Computed columns are not always used for > generating a rowtime or watermark. Users can and certainly will > implement more complex logic in there. One example could be decrypting > encrypted records/columns, performing checksum checks, reading metadata > etc. > > So in any case we should still provide two interfaces: > > SupportsWatermarkPushDown (functionality of computed columns + watermarks) > > > SupportsComputedColumnPushDown (functionality of computed columns only) > > I'm fine with such a design, but it is also confusing for implementers > that SupportsWatermarkPushDown includes the functionality of the other > interface. > > What do you think? > > Regards, > Timo > > > On 08.09.20 04:32, Jark Wu wrote: > > Thanks to Shengkai for summarizing the problems on the FLIP-95 interfaces > > and solutions. > > > > I think the new proposal, i.e. only pushing the "WatermarkStrategy" is > much > > cleaner and easier to develop than before. > > So I'm +1 to the proposal. > > > > Best, > > Jark > > > > On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <[hidden email]> wrote: > > > >> Hi, all. It seems the format is not normal. So I add a google doc in > >> link[1]. This discussion is about the interfaces in FLIP-95: New Table > >> Source And Table Sink and propose to merge two interfaces > >> SupportsWatermarkPushDown and SupportsComputedColumnPushDown. > >> > >> I am looking forward to any opinions and suggestions from the community. > >> > >> Regards, > >> Shengkai > >> > >> [1] > >> > >> > https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit# > >> > >> Shengkai Fang <[hidden email]> 于2020年9月4日周五 下午2:58写道: > >> > >>> Hi, all. Currently, we use two seperated interfaces > >>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design. > >> The > >>> interface SupportsWatermarkPushDown relies on > >>> SupportsComputedColumnPushDown when watermark is defined on a computed > >>> column. During the implementation, we find the method in > >>> SupportsWatermarkPushDown uses an out-of-date interface > WatermarkProvider > >>> and the duplication of SupportsComputedColumnPushDown and > >>> SupportsProjectionPushDown. Therefore, we decide to propose a new > >>> interface of SupportsWatermarkPushDown to solve the problems we > >> mentioned. > >>> > >>> > >>> *Problems of SupportsComputedColumnPushDown and > >> SupportsWatermarkPushDown*Problems > >>> of SupportsWatermarkPushDown > >>> > >>> SupportsWatermarkPushDown uses an inner interface named > >> WatermarkProvider to > >>> register WatermarkGenerator into DynamicTableSource now. However, the > >>> community uses org.apache.flink.api.common.eventtime.WatermarkStrategy > to > >>> create watermark generators in FLIP-126. WatermarkStrategy is a factory > >>> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses > >>> the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate > >>> Kafka-partition-aware watermarks. As for the origin WatermarkProvider, > it > >>> is used to generate deprecated AssignerWithPeriodicWatermarks and > >>> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not > >>> suitable to use the WatermarkProvider any more. > >>> > >>> > >>> Problems of SupportsComputedColumnPushDown > >>> > >>> There are two problems around when using SupportsComputedColumnPushDown > >>> alone. > >>> > >>> First, planner will transform the computed column and query such as > >> select > >>> a+b to a LogicalProject. When it comes to the optimization phase, we > have > >>> no means to distinguish whether the Rexnode in the projection is from > >>> computed columns or query. So SupportsComputedColumnPushDown in reality > >>> will push not only the computed column but also the calculation in the > >>> query. > >>> > >>> Second, when a plan matches the rule > >>> PushComputedColumnIntoTableSourceScanRule, we have to build a new > >> RowData to > >>> place all fields we require. However, both two rules > >>> PushComputedColumnIntoTableSourceScanRule and > >>> PushProjectIntoTableSourceScanRule will do the same work that prune the > >>> records that read from source. It seems that we have two duplicate > rules > >> in > >>> planner. But I think we should use the rule > >>> PushProjectIntoTableSourceScanRule rather than > >>> PushComputedColumnIntoTableSourceScanRule if we don't support watermark > >>> push down. Compared to PushComputedColumnIntoTableSourceScanRule, > >>> PushProjectIntoTableSourceScanRule is much lighter and we can read > pruned > >>> data from source rather than use a map function in flink. > >>> > >>> Therefore, we think it's not a good idea to use two interfaces rather > >> than > >>> one. > >>> > >>> > >>> *New Proposal* > >>> > >>> First of all, let us address some background when pushing watermarks > into > >>> table source scan. There are two structures that we need to consider. > We > >>> list two examples below for discussion. > >>> > >>> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, > >> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, > >> default_database, MyTable]])structure > >> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL > >> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, > 5000:INTERVAL > >> SECOND)]) +- LogicalTableScan(table=[[default_catalog, > default_database, > >> MyTable]]) > >>> > >>> As we can see, structure 2 is much more complicated than structure 1. > For > >>> structure 1, we can use the row from table scan to generate watermarks > >>> directly. But for structure 2, we need to calculate the rowtime > >> expression > >>> in LogicalProject and use the result of calculation to generate > >>> watermarks. Considering that WatermarkStrategy has the ability to > extract > >>> timestamp from row, we have a proposal to push only WatermarkStrategy > to > >>> scan. > >>> > >>> > >>> Push WatermarkStrategy to Scan > >>> > >>> In this interface, we will only push WatermarkStrategy to > >>> DynamicTableSource. > >>> > >>> public interface SupportsWatermarkPushDown { void > >> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy); } > >>> > >>> The advantage of the new api is that it's very simple for the > developers > >>> of Connector. They only need to take WatermarkStrategy into > consideration > >>> and don't need to deal with other infos such as ComputedColumnConverter > >>> in SupportsComputedColumnPushDown. But it also has one disadvantage > that > >>> it needs to calculate the rowtime expression again in LogicalProjection > >>> because we don't build a new row in scan to store the calculated > >> timestamp. > >>> However, we can replace the calculation of rowtime in LogicalProjection > >>> with a reference to eliminate duplicate calculation, which will use the > >>> StreamRecord's getter to read the timestamp that is calculated before. > >> But > >>> this optimization still has one limitation that it relies on computed > >>> columns are not defined on other computed columns. For nested computed > >>> columns, we have no place to save the intermediate result. > >>> > >>> But we still have a problem that when we push an udf into the source, > we > >>> need a context as powerful as FunctionContext to open the udf. But the > >>> current WatermarkGeneratorSupplier.Context only supports method > >>> getMetricGroup and misses methods getCachedFile, getJobParameter and g > >>> etExternalResourceInfos, which means we can't convert the > >>> WatermarkGeneratorSupplier.Context to FunctionContext safely. > Considering > >>> that the udf is only used to generate watermark, we suggest to throw > >>> UnsupportedException when invoking the methods exist in FunctionContext > >>> but don't exist in WatermarkGeneratorSupplier.Context. But we have to > >>> admit that there are risks in doing so because we have no promise that > >> the > >>> udf will not invoke these methods. > >>> > >>> > >>> *Summary* > >>> > >>> We have addressed the whole problem and solution in detail. As a > >>> conclusion, I think the new interface avoids the problems we mentioned > >>> before. It has a clear definition as its name tells and has nothing in > >>> common with SupportProjectionPushDown. As for its disadvantage, I think > >>> it's acceptable to calculate the rowtime column twice and we also have > a > >>> plan to improve its efficiency as a follow up if it brings some > >> performance > >>> problems. > >>> > >>> > >>> > >> > > > > |
Hi Timo and Jark.Thanks for your replies.
Maybe I don't explain clearly in doc. I think the main reason behind is we have no means to distinguish the calc in LogicalProject. Let me give you an example to illustrate the reason. Assume we have 2 cases: case 1: create table MyTable ( int a, int b ) with ( ... ) we use sql "select a, b, a+b as c from MyTable" to get the results. and case 2: create table MyTableWithComputedColumn ( a int, b int, c as a + b ) with ( ... ) we use sql "select a, b, c from MyTableWithComputedColumn" to get the results. When coming to planner, the two sqls will have the same plan, which means we will also push calculation from query to scan if we support computed column push down. As a supplement of Jark's response, currently org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#assignTimestampAndWatermarks(WatermarkStrategy) uses WatermarkStrategy to register watermark generator supplier. I think it's ok to use WatermarkStrategy directly because FLIP-126 has been finished. Jark Wu <[hidden email]> 于2020年9月8日周二 下午7:38写道: > Hi Timo, > > Regarding "pushing other computed columns into source, e.g. encrypted > records/columns, performing checksum checks, reading metadata etc.", > I'm not sure about this. > 1. the planner don't know which computed column should be pushed into > source > 2. it seems that we can't improve performances if we pushdown complex logic > into source, we still need to calculate them anyway. > 3. the computed column is a regular expression, if the computed column > should be pushed down, then shall we push the expressions in the following > Projection too? > If yes, then the name of "SupportsComputedColumnPushDown" might be not > correct. > 4. regarding reading metadata, according to FLIP-107, we don't use the > existing SupportsComputedColumnPushDown, but a new interface. > > Therefore, I don't find a strong use case that needs this interface so far. > > Best, > Jark > > > > > On Tue, 8 Sep 2020 at 17:13, Timo Walther <[hidden email]> wrote: > > > Hi Shengkai, > > > > first of I would not consider the section Problems of > > SupportsWatermarkPushDown" as a "problem". It was planned to update the > > WatermarkProvider once the interfaces are ready. See the comment in > > WatermarkProvider: > > > > // marker interface that will be filled after FLIP-126: > > // WatermarkGenerator<RowData> getWatermarkGenerator(); > > > > So far we had no sources that actually implement WatermarkStrategy. > > > > Second, for generating watermarks I don't see a problem in merging the > > two mentioned interfaces SupportsWatermarkPushDown and > > SupportsComputedColumnPushDown into one. The descibed design sounds > > reasonable to me and the impact on performance should not be too large. > > > > However, by merging these two interfaces we are also merging two > > completely separate concepts. Computed columns are not always used for > > generating a rowtime or watermark. Users can and certainly will > > implement more complex logic in there. One example could be decrypting > > encrypted records/columns, performing checksum checks, reading metadata > > etc. > > > > So in any case we should still provide two interfaces: > > > > SupportsWatermarkPushDown (functionality of computed columns + > watermarks) > > > > > > SupportsComputedColumnPushDown (functionality of computed columns only) > > > > I'm fine with such a design, but it is also confusing for implementers > > that SupportsWatermarkPushDown includes the functionality of the other > > interface. > > > > What do you think? > > > > Regards, > > Timo > > > > > > On 08.09.20 04:32, Jark Wu wrote: > > > Thanks to Shengkai for summarizing the problems on the FLIP-95 > interfaces > > > and solutions. > > > > > > I think the new proposal, i.e. only pushing the "WatermarkStrategy" is > > much > > > cleaner and easier to develop than before. > > > So I'm +1 to the proposal. > > > > > > Best, > > > Jark > > > > > > On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <[hidden email]> wrote: > > > > > >> Hi, all. It seems the format is not normal. So I add a google doc in > > >> link[1]. This discussion is about the interfaces in FLIP-95: New > Table > > >> Source And Table Sink and propose to merge two interfaces > > >> SupportsWatermarkPushDown and SupportsComputedColumnPushDown. > > >> > > >> I am looking forward to any opinions and suggestions from the > community. > > >> > > >> Regards, > > >> Shengkai > > >> > > >> [1] > > >> > > >> > > > https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit# > > >> > > >> Shengkai Fang <[hidden email]> 于2020年9月4日周五 下午2:58写道: > > >> > > >>> Hi, all. Currently, we use two seperated interfaces > > >>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in > design. > > >> The > > >>> interface SupportsWatermarkPushDown relies on > > >>> SupportsComputedColumnPushDown when watermark is defined on a > computed > > >>> column. During the implementation, we find the method in > > >>> SupportsWatermarkPushDown uses an out-of-date interface > > WatermarkProvider > > >>> and the duplication of SupportsComputedColumnPushDown and > > >>> SupportsProjectionPushDown. Therefore, we decide to propose a new > > >>> interface of SupportsWatermarkPushDown to solve the problems we > > >> mentioned. > > >>> > > >>> > > >>> *Problems of SupportsComputedColumnPushDown and > > >> SupportsWatermarkPushDown*Problems > > >>> of SupportsWatermarkPushDown > > >>> > > >>> SupportsWatermarkPushDown uses an inner interface named > > >> WatermarkProvider to > > >>> register WatermarkGenerator into DynamicTableSource now. However, the > > >>> community uses > org.apache.flink.api.common.eventtime.WatermarkStrategy > > to > > >>> create watermark generators in FLIP-126. WatermarkStrategy is a > factory > > >>> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer > uses > > >>> the method assignTimestampsAndWatermarks(WatermarkStrategy) to > generate > > >>> Kafka-partition-aware watermarks. As for the origin > WatermarkProvider, > > it > > >>> is used to generate deprecated AssignerWithPeriodicWatermarks and > > >>> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not > > >>> suitable to use the WatermarkProvider any more. > > >>> > > >>> > > >>> Problems of SupportsComputedColumnPushDown > > >>> > > >>> There are two problems around when using > SupportsComputedColumnPushDown > > >>> alone. > > >>> > > >>> First, planner will transform the computed column and query such as > > >> select > > >>> a+b to a LogicalProject. When it comes to the optimization phase, we > > have > > >>> no means to distinguish whether the Rexnode in the projection is from > > >>> computed columns or query. So SupportsComputedColumnPushDown in > reality > > >>> will push not only the computed column but also the calculation in > the > > >>> query. > > >>> > > >>> Second, when a plan matches the rule > > >>> PushComputedColumnIntoTableSourceScanRule, we have to build a new > > >> RowData to > > >>> place all fields we require. However, both two rules > > >>> PushComputedColumnIntoTableSourceScanRule and > > >>> PushProjectIntoTableSourceScanRule will do the same work that prune > the > > >>> records that read from source. It seems that we have two duplicate > > rules > > >> in > > >>> planner. But I think we should use the rule > > >>> PushProjectIntoTableSourceScanRule rather than > > >>> PushComputedColumnIntoTableSourceScanRule if we don't support > watermark > > >>> push down. Compared to PushComputedColumnIntoTableSourceScanRule, > > >>> PushProjectIntoTableSourceScanRule is much lighter and we can read > > pruned > > >>> data from source rather than use a map function in flink. > > >>> > > >>> Therefore, we think it's not a good idea to use two interfaces rather > > >> than > > >>> one. > > >>> > > >>> > > >>> *New Proposal* > > >>> > > >>> First of all, let us address some background when pushing watermarks > > into > > >>> table source scan. There are two structures that we need to consider. > > We > > >>> list two examples below for discussion. > > >>> > > >>> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, > > >> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, > > >> default_database, MyTable]])structure > > >> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL > > >> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, > > 5000:INTERVAL > > >> SECOND)]) +- LogicalTableScan(table=[[default_catalog, > > default_database, > > >> MyTable]]) > > >>> > > >>> As we can see, structure 2 is much more complicated than structure 1. > > For > > >>> structure 1, we can use the row from table scan to generate > watermarks > > >>> directly. But for structure 2, we need to calculate the rowtime > > >> expression > > >>> in LogicalProject and use the result of calculation to generate > > >>> watermarks. Considering that WatermarkStrategy has the ability to > > extract > > >>> timestamp from row, we have a proposal to push only WatermarkStrategy > > to > > >>> scan. > > >>> > > >>> > > >>> Push WatermarkStrategy to Scan > > >>> > > >>> In this interface, we will only push WatermarkStrategy to > > >>> DynamicTableSource. > > >>> > > >>> public interface SupportsWatermarkPushDown { void > > >> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy); } > > >>> > > >>> The advantage of the new api is that it's very simple for the > > developers > > >>> of Connector. They only need to take WatermarkStrategy into > > consideration > > >>> and don't need to deal with other infos such as > ComputedColumnConverter > > >>> in SupportsComputedColumnPushDown. But it also has one disadvantage > > that > > >>> it needs to calculate the rowtime expression again in > LogicalProjection > > >>> because we don't build a new row in scan to store the calculated > > >> timestamp. > > >>> However, we can replace the calculation of rowtime in > LogicalProjection > > >>> with a reference to eliminate duplicate calculation, which will use > the > > >>> StreamRecord's getter to read the timestamp that is calculated > before. > > >> But > > >>> this optimization still has one limitation that it relies on computed > > >>> columns are not defined on other computed columns. For nested > computed > > >>> columns, we have no place to save the intermediate result. > > >>> > > >>> But we still have a problem that when we push an udf into the source, > > we > > >>> need a context as powerful as FunctionContext to open the udf. But > the > > >>> current WatermarkGeneratorSupplier.Context only supports method > > >>> getMetricGroup and misses methods getCachedFile, getJobParameter and > g > > >>> etExternalResourceInfos, which means we can't convert the > > >>> WatermarkGeneratorSupplier.Context to FunctionContext safely. > > Considering > > >>> that the udf is only used to generate watermark, we suggest to throw > > >>> UnsupportedException when invoking the methods exist in > FunctionContext > > >>> but don't exist in WatermarkGeneratorSupplier.Context. But we have to > > >>> admit that there are risks in doing so because we have no promise > that > > >> the > > >>> udf will not invoke these methods. > > >>> > > >>> > > >>> *Summary* > > >>> > > >>> We have addressed the whole problem and solution in detail. As a > > >>> conclusion, I think the new interface avoids the problems we > mentioned > > >>> before. It has a clear definition as its name tells and has nothing > in > > >>> common with SupportProjectionPushDown. As for its disadvantage, I > think > > >>> it's acceptable to calculate the rowtime column twice and we also > have > > a > > >>> plan to improve its efficiency as a follow up if it brings some > > >> performance > > >>> problems. > > >>> > > >>> > > >>> > > >> > > > > > > > > |
Hi Jark, Hi Shengkai,
"shall we push the expressions in the following Projection too?" This is something that we should at least consider. I also don't find a strong use case. But what I see is that we are merging concepts that actually can be separated. And we are executing the same code twice. Regardless of what kind of code is executed (simple timestamp casting or more complex stuff). In any case, we cannot model ingestion time with the merged interfaces. Because the computed timestamp column is evaluated twice. If a function in the computed column is not deterministic, a watermark_rowtime != projection_rowtime mismatch is very hard to debug. Regards, Timo On 08.09.20 14:11, Shengkai Fang wrote: > Hi Timo and Jark.Thanks for your replies. > > Maybe I don't explain clearly in doc. I think the main reason behind is we > have no means to distinguish the calc in LogicalProject. Let me give you an > example to illustrate the reason. Assume we have 2 cases: > > > case 1: > > create table MyTable ( > > int a, > > int b > > ) with ( > > ... > > ) > > > we use sql "select a, b, a+b as c from MyTable" to get the results. > > > and > > > case 2: > > create table MyTableWithComputedColumn ( > > a int, > > b int, > > c as a + b > > ) with ( > > ... > > ) > > > we use sql "select a, b, c from MyTableWithComputedColumn" to get the > results. > > > When coming to planner, the two sqls will have the same plan, which means > we will also push calculation from query to scan if we support computed > column push down. > > > As a supplement of Jark's response, currently > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#assignTimestampAndWatermarks(WatermarkStrategy) > uses WatermarkStrategy to register watermark generator supplier. I think > it's ok to use WatermarkStrategy directly because FLIP-126 has been > finished. > > > > Jark Wu <[hidden email]> 于2020年9月8日周二 下午7:38写道: > >> Hi Timo, >> >> Regarding "pushing other computed columns into source, e.g. encrypted >> records/columns, performing checksum checks, reading metadata etc.", >> I'm not sure about this. >> 1. the planner don't know which computed column should be pushed into >> source >> 2. it seems that we can't improve performances if we pushdown complex logic >> into source, we still need to calculate them anyway. >> 3. the computed column is a regular expression, if the computed column >> should be pushed down, then shall we push the expressions in the following >> Projection too? >> If yes, then the name of "SupportsComputedColumnPushDown" might be not >> correct. >> 4. regarding reading metadata, according to FLIP-107, we don't use the >> existing SupportsComputedColumnPushDown, but a new interface. >> >> Therefore, I don't find a strong use case that needs this interface so far. >> >> Best, >> Jark >> >> >> >> >> On Tue, 8 Sep 2020 at 17:13, Timo Walther <[hidden email]> wrote: >> >>> Hi Shengkai, >>> >>> first of I would not consider the section Problems of >>> SupportsWatermarkPushDown" as a "problem". It was planned to update the >>> WatermarkProvider once the interfaces are ready. See the comment in >>> WatermarkProvider: >>> >>> // marker interface that will be filled after FLIP-126: >>> // WatermarkGenerator<RowData> getWatermarkGenerator(); >>> >>> So far we had no sources that actually implement WatermarkStrategy. >>> >>> Second, for generating watermarks I don't see a problem in merging the >>> two mentioned interfaces SupportsWatermarkPushDown and >>> SupportsComputedColumnPushDown into one. The descibed design sounds >>> reasonable to me and the impact on performance should not be too large. >>> >>> However, by merging these two interfaces we are also merging two >>> completely separate concepts. Computed columns are not always used for >>> generating a rowtime or watermark. Users can and certainly will >>> implement more complex logic in there. One example could be decrypting >>> encrypted records/columns, performing checksum checks, reading metadata >>> etc. >>> >>> So in any case we should still provide two interfaces: >>> >>> SupportsWatermarkPushDown (functionality of computed columns + >> watermarks) >>> >>> >>> SupportsComputedColumnPushDown (functionality of computed columns only) >>> >>> I'm fine with such a design, but it is also confusing for implementers >>> that SupportsWatermarkPushDown includes the functionality of the other >>> interface. >>> >>> What do you think? >>> >>> Regards, >>> Timo >>> >>> >>> On 08.09.20 04:32, Jark Wu wrote: >>>> Thanks to Shengkai for summarizing the problems on the FLIP-95 >> interfaces >>>> and solutions. >>>> >>>> I think the new proposal, i.e. only pushing the "WatermarkStrategy" is >>> much >>>> cleaner and easier to develop than before. >>>> So I'm +1 to the proposal. >>>> >>>> Best, >>>> Jark >>>> >>>> On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <[hidden email]> wrote: >>>> >>>>> Hi, all. It seems the format is not normal. So I add a google doc in >>>>> link[1]. This discussion is about the interfaces in FLIP-95: New >> Table >>>>> Source And Table Sink and propose to merge two interfaces >>>>> SupportsWatermarkPushDown and SupportsComputedColumnPushDown. >>>>> >>>>> I am looking forward to any opinions and suggestions from the >> community. >>>>> >>>>> Regards, >>>>> Shengkai >>>>> >>>>> [1] >>>>> >>>>> >>> >> https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit# >>>>> >>>>> Shengkai Fang <[hidden email]> 于2020年9月4日周五 下午2:58写道: >>>>> >>>>>> Hi, all. Currently, we use two seperated interfaces >>>>>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in >> design. >>>>> The >>>>>> interface SupportsWatermarkPushDown relies on >>>>>> SupportsComputedColumnPushDown when watermark is defined on a >> computed >>>>>> column. During the implementation, we find the method in >>>>>> SupportsWatermarkPushDown uses an out-of-date interface >>> WatermarkProvider >>>>>> and the duplication of SupportsComputedColumnPushDown and >>>>>> SupportsProjectionPushDown. Therefore, we decide to propose a new >>>>>> interface of SupportsWatermarkPushDown to solve the problems we >>>>> mentioned. >>>>>> >>>>>> >>>>>> *Problems of SupportsComputedColumnPushDown and >>>>> SupportsWatermarkPushDown*Problems >>>>>> of SupportsWatermarkPushDown >>>>>> >>>>>> SupportsWatermarkPushDown uses an inner interface named >>>>> WatermarkProvider to >>>>>> register WatermarkGenerator into DynamicTableSource now. However, the >>>>>> community uses >> org.apache.flink.api.common.eventtime.WatermarkStrategy >>> to >>>>>> create watermark generators in FLIP-126. WatermarkStrategy is a >> factory >>>>>> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer >> uses >>>>>> the method assignTimestampsAndWatermarks(WatermarkStrategy) to >> generate >>>>>> Kafka-partition-aware watermarks. As for the origin >> WatermarkProvider, >>> it >>>>>> is used to generate deprecated AssignerWithPeriodicWatermarks and >>>>>> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not >>>>>> suitable to use the WatermarkProvider any more. >>>>>> >>>>>> >>>>>> Problems of SupportsComputedColumnPushDown >>>>>> >>>>>> There are two problems around when using >> SupportsComputedColumnPushDown >>>>>> alone. >>>>>> >>>>>> First, planner will transform the computed column and query such as >>>>> select >>>>>> a+b to a LogicalProject. When it comes to the optimization phase, we >>> have >>>>>> no means to distinguish whether the Rexnode in the projection is from >>>>>> computed columns or query. So SupportsComputedColumnPushDown in >> reality >>>>>> will push not only the computed column but also the calculation in >> the >>>>>> query. >>>>>> >>>>>> Second, when a plan matches the rule >>>>>> PushComputedColumnIntoTableSourceScanRule, we have to build a new >>>>> RowData to >>>>>> place all fields we require. However, both two rules >>>>>> PushComputedColumnIntoTableSourceScanRule and >>>>>> PushProjectIntoTableSourceScanRule will do the same work that prune >> the >>>>>> records that read from source. It seems that we have two duplicate >>> rules >>>>> in >>>>>> planner. But I think we should use the rule >>>>>> PushProjectIntoTableSourceScanRule rather than >>>>>> PushComputedColumnIntoTableSourceScanRule if we don't support >> watermark >>>>>> push down. Compared to PushComputedColumnIntoTableSourceScanRule, >>>>>> PushProjectIntoTableSourceScanRule is much lighter and we can read >>> pruned >>>>>> data from source rather than use a map function in flink. >>>>>> >>>>>> Therefore, we think it's not a good idea to use two interfaces rather >>>>> than >>>>>> one. >>>>>> >>>>>> >>>>>> *New Proposal* >>>>>> >>>>>> First of all, let us address some background when pushing watermarks >>> into >>>>>> table source scan. There are two structures that we need to consider. >>> We >>>>>> list two examples below for discussion. >>>>>> >>>>>> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, >>>>> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, >>>>> default_database, MyTable]])structure >>>>> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL >>>>> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, >>> 5000:INTERVAL >>>>> SECOND)]) +- LogicalTableScan(table=[[default_catalog, >>> default_database, >>>>> MyTable]]) >>>>>> >>>>>> As we can see, structure 2 is much more complicated than structure 1. >>> For >>>>>> structure 1, we can use the row from table scan to generate >> watermarks >>>>>> directly. But for structure 2, we need to calculate the rowtime >>>>> expression >>>>>> in LogicalProject and use the result of calculation to generate >>>>>> watermarks. Considering that WatermarkStrategy has the ability to >>> extract >>>>>> timestamp from row, we have a proposal to push only WatermarkStrategy >>> to >>>>>> scan. >>>>>> >>>>>> >>>>>> Push WatermarkStrategy to Scan >>>>>> >>>>>> In this interface, we will only push WatermarkStrategy to >>>>>> DynamicTableSource. >>>>>> >>>>>> public interface SupportsWatermarkPushDown { void >>>>> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy); } >>>>>> >>>>>> The advantage of the new api is that it's very simple for the >>> developers >>>>>> of Connector. They only need to take WatermarkStrategy into >>> consideration >>>>>> and don't need to deal with other infos such as >> ComputedColumnConverter >>>>>> in SupportsComputedColumnPushDown. But it also has one disadvantage >>> that >>>>>> it needs to calculate the rowtime expression again in >> LogicalProjection >>>>>> because we don't build a new row in scan to store the calculated >>>>> timestamp. >>>>>> However, we can replace the calculation of rowtime in >> LogicalProjection >>>>>> with a reference to eliminate duplicate calculation, which will use >> the >>>>>> StreamRecord's getter to read the timestamp that is calculated >> before. >>>>> But >>>>>> this optimization still has one limitation that it relies on computed >>>>>> columns are not defined on other computed columns. For nested >> computed >>>>>> columns, we have no place to save the intermediate result. >>>>>> >>>>>> But we still have a problem that when we push an udf into the source, >>> we >>>>>> need a context as powerful as FunctionContext to open the udf. But >> the >>>>>> current WatermarkGeneratorSupplier.Context only supports method >>>>>> getMetricGroup and misses methods getCachedFile, getJobParameter and >> g >>>>>> etExternalResourceInfos, which means we can't convert the >>>>>> WatermarkGeneratorSupplier.Context to FunctionContext safely. >>> Considering >>>>>> that the udf is only used to generate watermark, we suggest to throw >>>>>> UnsupportedException when invoking the methods exist in >> FunctionContext >>>>>> but don't exist in WatermarkGeneratorSupplier.Context. But we have to >>>>>> admit that there are risks in doing so because we have no promise >> that >>>>> the >>>>>> udf will not invoke these methods. >>>>>> >>>>>> >>>>>> *Summary* >>>>>> >>>>>> We have addressed the whole problem and solution in detail. As a >>>>>> conclusion, I think the new interface avoids the problems we >> mentioned >>>>>> before. It has a clear definition as its name tells and has nothing >> in >>>>>> common with SupportProjectionPushDown. As for its disadvantage, I >> think >>>>>> it's acceptable to calculate the rowtime column twice and we also >> have >>> a >>>>>> plan to improve its efficiency as a follow up if it brings some >>>>> performance >>>>>> problems. >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >>> >> > |
Hi, Timo.
I agree with you that the concepts Watermark and ComputedColumn should be separated. However, we are merging the interface SupportsCalcPushDown and SupportsWatermarkPushDown actually. The concept computed column has disappeared in optimization. As for the drawback you mentiond, I have already given a solution to solve. We can place the calculated computed timestamp column on StreamRecord and replace the calculation in LogicalProject with a call of StreamRecordTimestampSqlFunction which will read the timestamp on StreamRecord. Regards, Shengkai. Timo Walther <[hidden email]> 于2020年9月8日周二 下午8:51写道: > Hi Jark, Hi Shengkai, > > "shall we push the expressions in the following Projection too?" > > This is something that we should at least consider. > > I also don't find a strong use case. But what I see is that we are > merging concepts that actually can be separated. And we are executing > the same code twice. Regardless of what kind of code is executed (simple > timestamp casting or more complex stuff). > > In any case, we cannot model ingestion time with the merged interfaces. > Because the computed timestamp column is evaluated twice. If a function > in the computed column is not deterministic, a watermark_rowtime != > projection_rowtime mismatch is very hard to debug. > > Regards, > Timo > > > > On 08.09.20 14:11, Shengkai Fang wrote: > > Hi Timo and Jark.Thanks for your replies. > > > > Maybe I don't explain clearly in doc. I think the main reason behind is > we > > have no means to distinguish the calc in LogicalProject. Let me give you > an > > example to illustrate the reason. Assume we have 2 cases: > > > > > > case 1: > > > > create table MyTable ( > > > > int a, > > > > int b > > > > ) with ( > > > > ... > > > > ) > > > > > > we use sql "select a, b, a+b as c from MyTable" to get the results. > > > > > > and > > > > > > case 2: > > > > create table MyTableWithComputedColumn ( > > > > a int, > > > > b int, > > > > c as a + b > > > > ) with ( > > > > ... > > > > ) > > > > > > we use sql "select a, b, c from MyTableWithComputedColumn" to get the > > results. > > > > > > When coming to planner, the two sqls will have the same plan, which means > > we will also push calculation from query to scan if we support computed > > column push down. > > > > > > As a supplement of Jark's response, currently > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#assignTimestampAndWatermarks(WatermarkStrategy) > > uses WatermarkStrategy to register watermark generator supplier. I think > > it's ok to use WatermarkStrategy directly because FLIP-126 has been > > finished. > > > > > > > > Jark Wu <[hidden email]> 于2020年9月8日周二 下午7:38写道: > > > >> Hi Timo, > >> > >> Regarding "pushing other computed columns into source, e.g. encrypted > >> records/columns, performing checksum checks, reading metadata etc.", > >> I'm not sure about this. > >> 1. the planner don't know which computed column should be pushed into > >> source > >> 2. it seems that we can't improve performances if we pushdown complex > logic > >> into source, we still need to calculate them anyway. > >> 3. the computed column is a regular expression, if the computed column > >> should be pushed down, then shall we push the expressions in the > following > >> Projection too? > >> If yes, then the name of "SupportsComputedColumnPushDown" might be > not > >> correct. > >> 4. regarding reading metadata, according to FLIP-107, we don't use the > >> existing SupportsComputedColumnPushDown, but a new interface. > >> > >> Therefore, I don't find a strong use case that needs this interface so > far. > >> > >> Best, > >> Jark > >> > >> > >> > >> > >> On Tue, 8 Sep 2020 at 17:13, Timo Walther <[hidden email]> wrote: > >> > >>> Hi Shengkai, > >>> > >>> first of I would not consider the section Problems of > >>> SupportsWatermarkPushDown" as a "problem". It was planned to update the > >>> WatermarkProvider once the interfaces are ready. See the comment in > >>> WatermarkProvider: > >>> > >>> // marker interface that will be filled after FLIP-126: > >>> // WatermarkGenerator<RowData> getWatermarkGenerator(); > >>> > >>> So far we had no sources that actually implement WatermarkStrategy. > >>> > >>> Second, for generating watermarks I don't see a problem in merging the > >>> two mentioned interfaces SupportsWatermarkPushDown and > >>> SupportsComputedColumnPushDown into one. The descibed design sounds > >>> reasonable to me and the impact on performance should not be too large. > >>> > >>> However, by merging these two interfaces we are also merging two > >>> completely separate concepts. Computed columns are not always used for > >>> generating a rowtime or watermark. Users can and certainly will > >>> implement more complex logic in there. One example could be decrypting > >>> encrypted records/columns, performing checksum checks, reading metadata > >>> etc. > >>> > >>> So in any case we should still provide two interfaces: > >>> > >>> SupportsWatermarkPushDown (functionality of computed columns + > >> watermarks) > >>> > >>> > >>> SupportsComputedColumnPushDown (functionality of computed columns only) > >>> > >>> I'm fine with such a design, but it is also confusing for implementers > >>> that SupportsWatermarkPushDown includes the functionality of the other > >>> interface. > >>> > >>> What do you think? > >>> > >>> Regards, > >>> Timo > >>> > >>> > >>> On 08.09.20 04:32, Jark Wu wrote: > >>>> Thanks to Shengkai for summarizing the problems on the FLIP-95 > >> interfaces > >>>> and solutions. > >>>> > >>>> I think the new proposal, i.e. only pushing the "WatermarkStrategy" is > >>> much > >>>> cleaner and easier to develop than before. > >>>> So I'm +1 to the proposal. > >>>> > >>>> Best, > >>>> Jark > >>>> > >>>> On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <[hidden email]> wrote: > >>>> > >>>>> Hi, all. It seems the format is not normal. So I add a google doc in > >>>>> link[1]. This discussion is about the interfaces in FLIP-95: New > >> Table > >>>>> Source And Table Sink and propose to merge two interfaces > >>>>> SupportsWatermarkPushDown and SupportsComputedColumnPushDown. > >>>>> > >>>>> I am looking forward to any opinions and suggestions from the > >> community. > >>>>> > >>>>> Regards, > >>>>> Shengkai > >>>>> > >>>>> [1] > >>>>> > >>>>> > >>> > >> > https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit# > >>>>> > >>>>> Shengkai Fang <[hidden email]> 于2020年9月4日周五 下午2:58写道: > >>>>> > >>>>>> Hi, all. Currently, we use two seperated interfaces > >>>>>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in > >> design. > >>>>> The > >>>>>> interface SupportsWatermarkPushDown relies on > >>>>>> SupportsComputedColumnPushDown when watermark is defined on a > >> computed > >>>>>> column. During the implementation, we find the method in > >>>>>> SupportsWatermarkPushDown uses an out-of-date interface > >>> WatermarkProvider > >>>>>> and the duplication of SupportsComputedColumnPushDown and > >>>>>> SupportsProjectionPushDown. Therefore, we decide to propose a new > >>>>>> interface of SupportsWatermarkPushDown to solve the problems we > >>>>> mentioned. > >>>>>> > >>>>>> > >>>>>> *Problems of SupportsComputedColumnPushDown and > >>>>> SupportsWatermarkPushDown*Problems > >>>>>> of SupportsWatermarkPushDown > >>>>>> > >>>>>> SupportsWatermarkPushDown uses an inner interface named > >>>>> WatermarkProvider to > >>>>>> register WatermarkGenerator into DynamicTableSource now. However, > the > >>>>>> community uses > >> org.apache.flink.api.common.eventtime.WatermarkStrategy > >>> to > >>>>>> create watermark generators in FLIP-126. WatermarkStrategy is a > >> factory > >>>>>> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer > >> uses > >>>>>> the method assignTimestampsAndWatermarks(WatermarkStrategy) to > >> generate > >>>>>> Kafka-partition-aware watermarks. As for the origin > >> WatermarkProvider, > >>> it > >>>>>> is used to generate deprecated AssignerWithPeriodicWatermarks and > >>>>>> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not > >>>>>> suitable to use the WatermarkProvider any more. > >>>>>> > >>>>>> > >>>>>> Problems of SupportsComputedColumnPushDown > >>>>>> > >>>>>> There are two problems around when using > >> SupportsComputedColumnPushDown > >>>>>> alone. > >>>>>> > >>>>>> First, planner will transform the computed column and query such as > >>>>> select > >>>>>> a+b to a LogicalProject. When it comes to the optimization phase, we > >>> have > >>>>>> no means to distinguish whether the Rexnode in the projection is > from > >>>>>> computed columns or query. So SupportsComputedColumnPushDown in > >> reality > >>>>>> will push not only the computed column but also the calculation in > >> the > >>>>>> query. > >>>>>> > >>>>>> Second, when a plan matches the rule > >>>>>> PushComputedColumnIntoTableSourceScanRule, we have to build a new > >>>>> RowData to > >>>>>> place all fields we require. However, both two rules > >>>>>> PushComputedColumnIntoTableSourceScanRule and > >>>>>> PushProjectIntoTableSourceScanRule will do the same work that prune > >> the > >>>>>> records that read from source. It seems that we have two duplicate > >>> rules > >>>>> in > >>>>>> planner. But I think we should use the rule > >>>>>> PushProjectIntoTableSourceScanRule rather than > >>>>>> PushComputedColumnIntoTableSourceScanRule if we don't support > >> watermark > >>>>>> push down. Compared to PushComputedColumnIntoTableSourceScanRule, > >>>>>> PushProjectIntoTableSourceScanRule is much lighter and we can read > >>> pruned > >>>>>> data from source rather than use a map function in flink. > >>>>>> > >>>>>> Therefore, we think it's not a good idea to use two interfaces > rather > >>>>> than > >>>>>> one. > >>>>>> > >>>>>> > >>>>>> *New Proposal* > >>>>>> > >>>>>> First of all, let us address some background when pushing watermarks > >>> into > >>>>>> table source scan. There are two structures that we need to > consider. > >>> We > >>>>>> list two examples below for discussion. > >>>>>> > >>>>>> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, > >>>>> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, > >>>>> default_database, MyTable]])structure > >>>>> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, > 5000:INTERVAL > >>>>> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, > >>> 5000:INTERVAL > >>>>> SECOND)]) +- LogicalTableScan(table=[[default_catalog, > >>> default_database, > >>>>> MyTable]]) > >>>>>> > >>>>>> As we can see, structure 2 is much more complicated than structure > 1. > >>> For > >>>>>> structure 1, we can use the row from table scan to generate > >> watermarks > >>>>>> directly. But for structure 2, we need to calculate the rowtime > >>>>> expression > >>>>>> in LogicalProject and use the result of calculation to generate > >>>>>> watermarks. Considering that WatermarkStrategy has the ability to > >>> extract > >>>>>> timestamp from row, we have a proposal to push only > WatermarkStrategy > >>> to > >>>>>> scan. > >>>>>> > >>>>>> > >>>>>> Push WatermarkStrategy to Scan > >>>>>> > >>>>>> In this interface, we will only push WatermarkStrategy to > >>>>>> DynamicTableSource. > >>>>>> > >>>>>> public interface SupportsWatermarkPushDown { void > >>>>> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy); } > >>>>>> > >>>>>> The advantage of the new api is that it's very simple for the > >>> developers > >>>>>> of Connector. They only need to take WatermarkStrategy into > >>> consideration > >>>>>> and don't need to deal with other infos such as > >> ComputedColumnConverter > >>>>>> in SupportsComputedColumnPushDown. But it also has one disadvantage > >>> that > >>>>>> it needs to calculate the rowtime expression again in > >> LogicalProjection > >>>>>> because we don't build a new row in scan to store the calculated > >>>>> timestamp. > >>>>>> However, we can replace the calculation of rowtime in > >> LogicalProjection > >>>>>> with a reference to eliminate duplicate calculation, which will use > >> the > >>>>>> StreamRecord's getter to read the timestamp that is calculated > >> before. > >>>>> But > >>>>>> this optimization still has one limitation that it relies on > computed > >>>>>> columns are not defined on other computed columns. For nested > >> computed > >>>>>> columns, we have no place to save the intermediate result. > >>>>>> > >>>>>> But we still have a problem that when we push an udf into the > source, > >>> we > >>>>>> need a context as powerful as FunctionContext to open the udf. But > >> the > >>>>>> current WatermarkGeneratorSupplier.Context only supports method > >>>>>> getMetricGroup and misses methods getCachedFile, getJobParameter and > >> g > >>>>>> etExternalResourceInfos, which means we can't convert the > >>>>>> WatermarkGeneratorSupplier.Context to FunctionContext safely. > >>> Considering > >>>>>> that the udf is only used to generate watermark, we suggest to throw > >>>>>> UnsupportedException when invoking the methods exist in > >> FunctionContext > >>>>>> but don't exist in WatermarkGeneratorSupplier.Context. But we have > to > >>>>>> admit that there are risks in doing so because we have no promise > >> that > >>>>> the > >>>>>> udf will not invoke these methods. > >>>>>> > >>>>>> > >>>>>> *Summary* > >>>>>> > >>>>>> We have addressed the whole problem and solution in detail. As a > >>>>>> conclusion, I think the new interface avoids the problems we > >> mentioned > >>>>>> before. It has a clear definition as its name tells and has nothing > >> in > >>>>>> common with SupportProjectionPushDown. As for its disadvantage, I > >> think > >>>>>> it's acceptable to calculate the rowtime column twice and we also > >> have > >>> a > >>>>>> plan to improve its efficiency as a follow up if it brings some > >>>>> performance > >>>>>> problems. > >>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >>> > >> > > > > |
Free forum by Nabble | Edit this page |