Hi,
I would like to propose an improvement that would enable reading table columns from different parts of source records. Besides the main payload majority (if not all of the sources) expose additional information. It can be simply a read-only metadata such as offset, ingestion time or a read and write parts of the record that contain data but additionally serve different purposes (partitioning, compaction etc.), e.g. key or timestamp in Kafka. We should make it possible to read and write data from all of those locations. In this proposal I discuss reading partitioning data, for completeness this proposal discusses also the partitioning when writing data out. I am looking forward to your comments. You can access the FLIP here: https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode Best, Dawid signature.asc (849 bytes) Download Attachment |
Hi,
Thanks Dawid for starting such a great discussion. Reaing metadata and key-part information from source is an important feature for streaming users. In general, I agree with the proposal of the FLIP. I will leave my thoughts and comments here: 1) +1 to use connector properties instead of introducing HEADER keyword as the reason you mentioned in the FLIP. 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should add a section to explain what's the relationship between them. Do their concepts conflict? Could INSERT PARTITION be used on the PARTITIONED table in this FLIP? 3) Currently, properties are hierarchical in Flink SQL. Shall we make the new introduced properties more hierarchical? For example, "timestamp" => "connector.timestamp"? (actually, I prefer "kafka.timestamp" which is another improvement for properties FLINK-12557) A single "timestamp" in properties may mislead users that the field is a rowtime attribute. I also left some minor comments in the FLIP. Thanks, Jark On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <[hidden email]> wrote: > Hi, > > I would like to propose an improvement that would enable reading table > columns from different parts of source records. Besides the main payload > majority (if not all of the sources) expose additional information. It > can be simply a read-only metadata such as offset, ingestion time or a > read and write parts of the record that contain data but additionally > serve different purposes (partitioning, compaction etc.), e.g. key or > timestamp in Kafka. > > We should make it possible to read and write data from all of those > locations. In this proposal I discuss reading partitioning data, for > completeness this proposal discusses also the partitioning when writing > data out. > > I am looking forward to your comments. > > You can access the FLIP here: > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > > Best, > > Dawid > > > |
Hi Jark, Ad. 2 I added a section to discuss relation to FLIP-63 Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. Therefore you have the key.format.type. I also considered exactly what you are suggesting (prefixing with
connector or kafka). I should've put that into an Option/Rejected
alternatives. I agree timestamp, key.*, value.* are connector properties. Why I wanted to suggest not adding that prefix in the first version is that actually all the properties in the WITH section are connector properties. Even format is in the end a connector property as some of the sources might not have a format, imo. The benefit of not adding the prefix is that it makes the keys a bit shorter. Imagine prefixing all the properties with connector (or if we go with FLINK-12557: elasticsearch): elasticsearch.key.format.type: csv elasticsearch.key.format.field: .... elasticsearch.key.format.delimiter: .... elasticsearch.key.format.*: .... I am fine with doing it though if this is a preferred approach in the community. Ad in-line comments: I forgot to update the `value.fields.include` property. It should be value.fields-include. Which I think you also suggested in the comment, right? As for the cast vs declaring output type of computed column. I think it's better not to use CAST, but declare a type of an expression and later on infer the output type of SYSTEM_METADATA. The reason is I think this way it will be easier to implement e.g. filter push downs when working with the native types of the source, e.g. in case of Kafka's offset, i think it's better to pushdown long rather than string. This could let us push expression like e.g. offset > 12345 & offset < 59382. Otherwise we would have to push down cast(offset, long) > 12345 && cast(offset, long) < 59382. Moreover I think we need to introduce the type for computed columns anyway to support functions that infer output type based on expected return type. As for the computed column push down. Yes, SYSTEM_METADATA would have to be pushed down to the source. If it is not possible the planner should fail. As far as I know computed columns push down will be part of source rework, won't it? ;) As for the persisted computed column. I think it is completely orthogonal. In my current proposal you can also partition by a computed column. The difference between using a udf in partitioned by vs partitioned by a computed column is that when you partition by a computed column this column must be also computed when reading the table. If you use a udf in the partitioned by, the expression is computed only when inserting into the table. Hope this answers some of your questions. Looking forward for further suggestions. Best, Dawid
On 02/03/2020 05:18, Jark Wu wrote:
Hi, Thanks Dawid for starting such a great discussion. Reaing metadata and key-part information from source is an important feature for streaming users. In general, I agree with the proposal of the FLIP. I will leave my thoughts and comments here: 1) +1 to use connector properties instead of introducing HEADER keyword as the reason you mentioned in the FLIP. 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should add a section to explain what's the relationship between them. Do their concepts conflict? Could INSERT PARTITION be used on the PARTITIONED table in this FLIP? 3) Currently, properties are hierarchical in Flink SQL. Shall we make the new introduced properties more hierarchical? For example, "timestamp" => "connector.timestamp"? (actually, I prefer "kafka.timestamp" which is another improvement for properties FLINK-12557) A single "timestamp" in properties may mislead users that the field is a rowtime attribute. I also left some minor comments in the FLIP. Thanks, Jark On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz [hidden email] wrote:Hi, I would like to propose an improvement that would enable reading table columns from different parts of source records. Besides the main payload majority (if not all of the sources) expose additional information. It can be simply a read-only metadata such as offset, ingestion time or a read and write parts of the record that contain data but additionally serve different purposes (partitioning, compaction etc.), e.g. key or timestamp in Kafka. We should make it possible to read and write data from all of those locations. In this proposal I discuss reading partitioning data, for completeness this proposal discusses also the partitioning when writing data out. I am looking forward to your comments. You can access the FLIP here: https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode Best, Dawid signature.asc (849 bytes) Download Attachment |
Hi Dawid,
> connector properties Could we use "timestamp.field" instead of "timestamp"? This will be more consistent with "key.fields" and it can avoid to confuse users it defines a rowtime attribute (KSQL [1] use "timestamp" property to override ROWTIME information). > SYSTEM_METADATA(...) I agree SYSTEM_METADATA computed column with returning type is the most clear way to support accessing read-only information. We may need further discussion about the implementation details, e.g. how to represent such computed column (esp. the returning type) in Calcite SqlFunction? > The difference between using a udf in partitioned by vs partitioned by a computed column is that when you partition by a computed column this column must be also computed when reading the table. The computed column is not necessary to be computed when reading the table if the column is not used in the query. This can be done by the optimizer. [1]: https://docs.confluent.io/current/ksql/docs/developer-guide/syntax-reference.html#ksql-timestamp-formats On Mon, 2 Mar 2020 at 18:16, Dawid Wysakowicz <[hidden email]> wrote: > Hi Jark, > > Ad. 2 I added a section to discuss relation to FLIP-63 > > Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. > Therefore you have the key.format.type. > > I also considered exactly what you are suggesting (prefixing with > connector or kafka). I should've put that into an Option/Rejected > alternatives. > > I agree timestamp, key.*, value.* are connector properties. Why I wanted > to suggest not adding that prefix in the first version is that actually all > the properties in the WITH section are connector properties. Even format is > in the end a connector property as some of the sources might not have a > format, imo. The benefit of not adding the prefix is that it makes the keys > a bit shorter. Imagine prefixing all the properties with connector (or if > we go with FLINK-12557: elasticsearch): > > elasticsearch.key.format.type: csv > > elasticsearch.key.format.field: .... > > elasticsearch.key.format.delimiter: .... > > elasticsearch.key.format.*: .... > > I am fine with doing it though if this is a preferred approach in the > community. > > Ad in-line comments: > > I forgot to update the `value.fields.include` property. It should be *value.fields-include. > *Which I think you also suggested in the comment, right? > > As for the cast vs declaring output type of computed column. I think it's > better not to use CAST, but declare a type of an expression and later on > infer the output type of SYSTEM_METADATA. The reason is I think this way it > will be easier to implement e.g. filter push downs when working with the > native types of the source, e.g. in case of Kafka's offset, i think it's > better to pushdown long rather than string. This could let us push > expression like e.g. offset > 12345 & offset < 59382. Otherwise we would > have to push down cast(offset, long) > 12345 && cast(offset, long) < > 59382. Moreover I think we need to introduce the type for computed columns > anyway to support functions that infer output type based on expected return > type. > > As for the computed column push down. Yes, SYSTEM_METADATA would have to > be pushed down to the source. If it is not possible the planner should > fail. As far as I know computed columns push down will be part of source > rework, won't it? ;) > > As for the persisted computed column. I think it is completely orthogonal. > In my current proposal you can also partition by a computed column. The > difference between using a udf in partitioned by vs partitioned by a > computed column is that when you partition by a computed column this column > must be also computed when reading the table. If you use a udf in the > partitioned by, the expression is computed only when inserting into the > table. > > Hope this answers some of your questions. Looking forward for further > suggestions. > > Best, > > Dawid > > > > On 02/03/2020 05:18, Jark Wu wrote: > > Hi, > > Thanks Dawid for starting such a great discussion. Reaing metadata and > key-part information from source is an important feature for streaming > users. > > In general, I agree with the proposal of the FLIP. > I will leave my thoughts and comments here: > > 1) +1 to use connector properties instead of introducing HEADER keyword as > the reason you mentioned in the FLIP. > 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should add a > section to explain what's the relationship between them. > Do their concepts conflict? Could INSERT PARTITION be used on the > PARTITIONED table in this FLIP? > 3) Currently, properties are hierarchical in Flink SQL. Shall we make the > new introduced properties more hierarchical? > For example, "timestamp" => "connector.timestamp"? (actually, I prefer > "kafka.timestamp" which is another improvement for properties FLINK-12557) > A single "timestamp" in properties may mislead users that the field is > a rowtime attribute. > > I also left some minor comments in the FLIP. > > Thanks, > Jark > > > > On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <[hidden email]> <[hidden email]> > wrote: > > > Hi, > > I would like to propose an improvement that would enable reading table > columns from different parts of source records. Besides the main payload > majority (if not all of the sources) expose additional information. It > can be simply a read-only metadata such as offset, ingestion time or a > read and write parts of the record that contain data but additionally > serve different purposes (partitioning, compaction etc.), e.g. key or > timestamp in Kafka. > > We should make it possible to read and write data from all of those > locations. In this proposal I discuss reading partitioning data, for > completeness this proposal discusses also the partitioning when writing > data out. > > I am looking forward to your comments. > > You can access the FLIP here: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > > Best, > > Dawid > > > > > |
In reply to this post by dwysakowicz
Hi Dawid,
Thanks for driving this FLIP,big +1 for the proposal feature. About the connector.properties part, I suggest avoid using timestamp because timestamp is a keyword in DDL as dataType, user may feel confused, using 'timestamp.filed’ or ’source.timestamp’ will be better? ``` CREATE TABLE kafka_table ( id BIGINT, eventType STRING, timestamp TIMESTAMP(3) ) WITH ( 'connector.type' = 'kafka', 'value.format.type' = 'avro’, 'timestamp' = 'timestamp' ) ``` Another minor comment, we could use `timestamp` replaces timestamp in column definition of the example. Best, Leonard > 在 2020年3月1日,22:30,Dawid Wysakowicz <[hidden email]> 写道: > > Hi, > > I would like to propose an improvement that would enable reading table > columns from different parts of source records. Besides the main payload > majority (if not all of the sources) expose additional information. It > can be simply a read-only metadata such as offset, ingestion time or a > read and write parts of the record that contain data but additionally > serve different purposes (partitioning, compaction etc.), e.g. key or > timestamp in Kafka. > > We should make it possible to read and write data from all of those > locations. In this proposal I discuss reading partitioning data, for > completeness this proposal discusses also the partitioning when writing > data out. > > I am looking forward to your comments. > > You can access the FLIP here: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > > Best, > > Dawid > > |
In reply to this post by dwysakowicz
Thanks Dawid for bringing up this discussion, I think it is a useful feature ~
About how the metadata outputs from source I think it is completely orthogonal, computed column push down is another topic, this should not be a blocker but a promotion, if we do not have any filters on the computed column, there is no need to do any pushings; the source node just emit the complete record with full metadata with the declared physical schema, then when generating the virtual columns, we would extract the metadata info and output as full columns(with full schema). About the type of metadata column Personally i prefer explicit type instead of CAST, they are symantic equivalent though, explict type is more straight-forward and we can declare the nullable attribute there. About option A: partitioning based on acomputed column VS option B: partitioning with just a function From the FLIP, it seems that B's partitioning is just a strategy when writing data, the partiton column is not included in the table schema, so it's just useless when reading from that. - Compared to A, we do not need to generate the partition column when selecting from the table(but insert into) - For A we can also mark the column as STORED when we want to persist that So in my opition they are orthogonal, we can support both, i saw that MySQL/Oracle[1][2] would suggest to also define the PARTITIONS num, and the partitions are managed under a "tablenamespace", the partition in which the record is stored is partition number N, where N = MOD(expr, num), for your design, which partiton the record would persist ? [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html [2] https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 Best, Danny Chan 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <[hidden email]>,写道: > Hi Jark, > Ad. 2 I added a section to discuss relation to FLIP-63 > Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. Therefore you have the key.format.type. > I also considered exactly what you are suggesting (prefixing with connector or kafka). I should've put that into an Option/Rejected alternatives. > I agree timestamp, key.*, value.* are connector properties. Why I wanted to suggest not adding that prefix in the first version is that actually all the properties in the WITH section are connector properties. Even format is in the end a connector property as some of the sources might not have a format, imo. The benefit of not adding the prefix is that it makes the keys a bit shorter. Imagine prefixing all the properties with connector (or if we go with FLINK-12557: elasticsearch): > elasticsearch.key.format.type: csv > elasticsearch.key.format.field: .... > elasticsearch.key.format.delimiter: .... > elasticsearch.key.format.*: .... > I am fine with doing it though if this is a preferred approach in the community. > Ad in-line comments: > I forgot to update the `value.fields.include` property. It should be value.fields-include. Which I think you also suggested in the comment, right? > As for the cast vs declaring output type of computed column. I think it's better not to use CAST, but declare a type of an expression and later on infer the output type of SYSTEM_METADATA. The reason is I think this way it will be easier to implement e.g. filter push downs when working with the native types of the source, e.g. in case of Kafka's offset, i think it's better to pushdown long rather than string. This could let us push expression like e.g. offset > 12345 & offset < 59382. Otherwise we would have to push down cast(offset, long) > 12345 && cast(offset, long) < 59382. Moreover I think we need to introduce the type for computed columns anyway to support functions that infer output type based on expected return type. > As for the computed column push down. Yes, SYSTEM_METADATA would have to be pushed down to the source. If it is not possible the planner should fail. As far as I know computed columns push down will be part of source rework, won't it? ;) > As for the persisted computed column. I think it is completely orthogonal. In my current proposal you can also partition by a computed column. The difference between using a udf in partitioned by vs partitioned by a computed column is that when you partition by a computed column this column must be also computed when reading the table. If you use a udf in the partitioned by, the expression is computed only when inserting into the table. > Hope this answers some of your questions. Looking forward for further suggestions. > Best, > Dawid > > > On 02/03/2020 05:18, Jark Wu wrote: > > Hi, > > > > Thanks Dawid for starting such a great discussion. Reaing metadata and > > key-part information from source is an important feature for streaming > > users. > > > > In general, I agree with the proposal of the FLIP. > > I will leave my thoughts and comments here: > > > > 1) +1 to use connector properties instead of introducing HEADER keyword as > > the reason you mentioned in the FLIP. > > 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should add a > > section to explain what's the relationship between them. > > Do their concepts conflict? Could INSERT PARTITION be used on the > > PARTITIONED table in this FLIP? > > 3) Currently, properties are hierarchical in Flink SQL. Shall we make the > > new introduced properties more hierarchical? > > For example, "timestamp" => "connector.timestamp"? (actually, I prefer > > "kafka.timestamp" which is another improvement for properties FLINK-12557) > > A single "timestamp" in properties may mislead users that the field is > > a rowtime attribute. > > > > I also left some minor comments in the FLIP. > > > > Thanks, > > Jark > > > > > > > > On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <[hidden email]> > > wrote: > > > > > Hi, > > > > > > I would like to propose an improvement that would enable reading table > > > columns from different parts of source records. Besides the main payload > > > majority (if not all of the sources) expose additional information. It > > > can be simply a read-only metadata such as offset, ingestion time or a > > > read and write parts of the record that contain data but additionally > > > serve different purposes (partitioning, compaction etc.), e.g. key or > > > timestamp in Kafka. > > > > > > We should make it possible to read and write data from all of those > > > locations. In this proposal I discuss reading partitioning data, for > > > completeness this proposal discusses also the partitioning when writing > > > data out. > > > > > > I am looking forward to your comments. > > > > > > You can access the FLIP here: > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > > > > > > Best, > > > > > > Dawid > > > > > > > > > |
Hi,
1. I thought a bit more on how the source would emit the columns and I now see its not exactly the same as regular columns. I see a need to elaborate a bit more on that in the FLIP as you asked, Jark. I do agree mostly with Danny on how we should do that. One additional things I would introduce is an interface SupportsMetadata { boolean supportsMetadata(Set<String> metadataFields); TableSource generateMetadataFields(Set<String> metadataFields); } This way the source would have to declare/emit only the requested metadata fields. In order not to clash with user defined fields. When emitting the metadata field I would prepend the column name with __system_{property_name}. Therefore when requested SYSTEM_METADATA("partition") the source would append a field __system_partition to the schema. This would be never visible to the user as it would be used only for the subsequent computed columns. If that makes sense to you, I will update the FLIP with this description. 2. CAST vs explicit type in computed columns Here I agree with Danny. It is also the current state of the proposal. 3. Partitioning on computed column vs function Here I also agree with Danny. I also think those are orthogonal. I would leave out the STORED computed columns out of the discussion. I don't see how do they relate to the partitioning. I already put both of those cases in the document. We can either partition on a computed column or use a udf in a partioned by clause. I am fine with leaving out the partitioning by udf in the first version if you still have some concerns. As for your question Danny. It depends which partitioning strategy you use. For the HASH partitioning strategy I thought it would work as you explained. It would be N = MOD(expr, num). I am not sure though if we should introduce the PARTITIONS clause. Usually Flink does not own the data and the partitions are already an intrinsic property of the underlying source e.g. for kafka we do not create topics, but we just describe pre-existing pre-partitioned topic. 4. timestamp vs timestamp.field vs connector.field vs ... I am fine with changing it to timestamp.field to be consistent with other value.fields and key.fields. Actually that was also my initial proposal in a first draft I prepared. I changed it afterwards to shorten the key. Best, Dawid On 03/03/2020 09:00, Danny Chan wrote: > Thanks Dawid for bringing up this discussion, I think it is a useful feature ~ > > About how the metadata outputs from source > > I think it is completely orthogonal, computed column push down is another topic, this should not be a blocker but a promotion, if we do not have any filters on the computed column, there is no need to do any pushings; the source node just emit the complete record with full metadata with the declared physical schema, then when generating the virtual columns, we would extract the metadata info and output as full columns(with full schema). > > About the type of metadata column > > Personally i prefer explicit type instead of CAST, they are symantic equivalent though, explict type is more straight-forward and we can declare the nullable attribute there. > > About option A: partitioning based on acomputed column VS option B: partitioning with just a function > > From the FLIP, it seems that B's partitioning is just a strategy when writing data, the partiton column is not included in the table schema, so it's just useless when reading from that. > > - Compared to A, we do not need to generate the partition column when selecting from the table(but insert into) > - For A we can also mark the column as STORED when we want to persist that > > So in my opition they are orthogonal, we can support both, i saw that MySQL/Oracle[1][2] would suggest to also define the PARTITIONS num, and the partitions are managed under a "tablenamespace", the partition in which the record is stored is partition number N, where N = MOD(expr, num), for your design, which partiton the record would persist ? > > [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html > [2] https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 > > Best, > Danny Chan > 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <[hidden email]>,写道: >> Hi Jark, >> Ad. 2 I added a section to discuss relation to FLIP-63 >> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. Therefore you have the key.format.type. >> I also considered exactly what you are suggesting (prefixing with connector or kafka). I should've put that into an Option/Rejected alternatives. >> I agree timestamp, key.*, value.* are connector properties. Why I wanted to suggest not adding that prefix in the first version is that actually all the properties in the WITH section are connector properties. Even format is in the end a connector property as some of the sources might not have a format, imo. The benefit of not adding the prefix is that it makes the keys a bit shorter. Imagine prefixing all the properties with connector (or if we go with FLINK-12557: elasticsearch): >> elasticsearch.key.format.type: csv >> elasticsearch.key.format.field: .... >> elasticsearch.key.format.delimiter: .... >> elasticsearch.key.format.*: .... >> I am fine with doing it though if this is a preferred approach in the community. >> Ad in-line comments: >> I forgot to update the `value.fields.include` property. It should be value.fields-include. Which I think you also suggested in the comment, right? >> As for the cast vs declaring output type of computed column. I think it's better not to use CAST, but declare a type of an expression and later on infer the output type of SYSTEM_METADATA. The reason is I think this way it will be easier to implement e.g. filter push downs when working with the native types of the source, e.g. in case of Kafka's offset, i think it's better to pushdown long rather than string. This could let us push expression like e.g. offset > 12345 & offset < 59382. Otherwise we would have to push down cast(offset, long) > 12345 && cast(offset, long) < 59382. Moreover I think we need to introduce the type for computed columns anyway to support functions that infer output type based on expected return type. >> As for the computed column push down. Yes, SYSTEM_METADATA would have to be pushed down to the source. If it is not possible the planner should fail. As far as I know computed columns push down will be part of source rework, won't it? ;) >> As for the persisted computed column. I think it is completely orthogonal. In my current proposal you can also partition by a computed column. The difference between using a udf in partitioned by vs partitioned by a computed column is that when you partition by a computed column this column must be also computed when reading the table. If you use a udf in the partitioned by, the expression is computed only when inserting into the table. >> Hope this answers some of your questions. Looking forward for further suggestions. >> Best, >> Dawid >> >> >> On 02/03/2020 05:18, Jark Wu wrote: >>> Hi, >>> >>> Thanks Dawid for starting such a great discussion. Reaing metadata and >>> key-part information from source is an important feature for streaming >>> users. >>> >>> In general, I agree with the proposal of the FLIP. >>> I will leave my thoughts and comments here: >>> >>> 1) +1 to use connector properties instead of introducing HEADER keyword as >>> the reason you mentioned in the FLIP. >>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should add a >>> section to explain what's the relationship between them. >>> Do their concepts conflict? Could INSERT PARTITION be used on the >>> PARTITIONED table in this FLIP? >>> 3) Currently, properties are hierarchical in Flink SQL. Shall we make the >>> new introduced properties more hierarchical? >>> For example, "timestamp" => "connector.timestamp"? (actually, I prefer >>> "kafka.timestamp" which is another improvement for properties FLINK-12557) >>> A single "timestamp" in properties may mislead users that the field is >>> a rowtime attribute. >>> >>> I also left some minor comments in the FLIP. >>> >>> Thanks, >>> Jark >>> >>> >>> >>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <[hidden email]> >>> wrote: >>> >>>> Hi, >>>> >>>> I would like to propose an improvement that would enable reading table >>>> columns from different parts of source records. Besides the main payload >>>> majority (if not all of the sources) expose additional information. It >>>> can be simply a read-only metadata such as offset, ingestion time or a >>>> read and write parts of the record that contain data but additionally >>>> serve different purposes (partitioning, compaction etc.), e.g. key or >>>> timestamp in Kafka. >>>> >>>> We should make it possible to read and write data from all of those >>>> locations. In this proposal I discuss reading partitioning data, for >>>> completeness this proposal discusses also the partitioning when writing >>>> data out. >>>> >>>> I am looking forward to your comments. >>>> >>>> You can access the FLIP here: >>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode >>>> >>>> Best, >>>> >>>> Dawid >>>> >>>> >>>> signature.asc (849 bytes) Download Attachment |
Thanks Dawid,
I have two more questions. > SupportsMetadata Introducing SupportsMetadata sounds good to me. But I have some questions regarding to this interface. 1) How do the source know what the expected return type of each metadata? 2) Where to put the metadata fields? Append to the existing physical fields? If yes, I would suggest to change the signature to `TableSource appendMetadataFields(String[] metadataNames, DataType[] metadataTypes)` > SYSTEM_METADATA("partition") Can SYSTEM_METADATA() function be used nested in a computed column expression? If yes, how to specify the return type of SYSTEM_METADATA? Best, Jark On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <[hidden email]> wrote: > Hi, > > 1. I thought a bit more on how the source would emit the columns and I > now see its not exactly the same as regular columns. I see a need to > elaborate a bit more on that in the FLIP as you asked, Jark. > > I do agree mostly with Danny on how we should do that. One additional > things I would introduce is an > > interface SupportsMetadata { > > boolean supportsMetadata(Set<String> metadataFields); > > TableSource generateMetadataFields(Set<String> metadataFields); > > } > > This way the source would have to declare/emit only the requested > metadata fields. In order not to clash with user defined fields. When > emitting the metadata field I would prepend the column name with > __system_{property_name}. Therefore when requested > SYSTEM_METADATA("partition") the source would append a field > __system_partition to the schema. This would be never visible to the > user as it would be used only for the subsequent computed columns. If > that makes sense to you, I will update the FLIP with this description. > > 2. CAST vs explicit type in computed columns > > Here I agree with Danny. It is also the current state of the proposal. > > 3. Partitioning on computed column vs function > > Here I also agree with Danny. I also think those are orthogonal. I would > leave out the STORED computed columns out of the discussion. I don't see > how do they relate to the partitioning. I already put both of those > cases in the document. We can either partition on a computed column or > use a udf in a partioned by clause. I am fine with leaving out the > partitioning by udf in the first version if you still have some concerns. > > As for your question Danny. It depends which partitioning strategy you use. > > For the HASH partitioning strategy I thought it would work as you > explained. It would be N = MOD(expr, num). I am not sure though if we > should introduce the PARTITIONS clause. Usually Flink does not own the > data and the partitions are already an intrinsic property of the > underlying source e.g. for kafka we do not create topics, but we just > describe pre-existing pre-partitioned topic. > > 4. timestamp vs timestamp.field vs connector.field vs ... > > I am fine with changing it to timestamp.field to be consistent with > other value.fields and key.fields. Actually that was also my initial > proposal in a first draft I prepared. I changed it afterwards to shorten > the key. > > Best, > > Dawid > > On 03/03/2020 09:00, Danny Chan wrote: > > Thanks Dawid for bringing up this discussion, I think it is a useful > feature ~ > > > > About how the metadata outputs from source > > > > I think it is completely orthogonal, computed column push down is > another topic, this should not be a blocker but a promotion, if we do not > have any filters on the computed column, there is no need to do any > pushings; the source node just emit the complete record with full metadata > with the declared physical schema, then when generating the virtual > columns, we would extract the metadata info and output as full columns(with > full schema). > > > > About the type of metadata column > > > > Personally i prefer explicit type instead of CAST, they are symantic > equivalent though, explict type is more straight-forward and we can declare > the nullable attribute there. > > > > About option A: partitioning based on acomputed column VS option B: > partitioning with just a function > > > > From the FLIP, it seems that B's partitioning is just a strategy when > writing data, the partiton column is not included in the table schema, so > it's just useless when reading from that. > > > > - Compared to A, we do not need to generate the partition column when > selecting from the table(but insert into) > > - For A we can also mark the column as STORED when we want to persist > that > > > > So in my opition they are orthogonal, we can support both, i saw that > MySQL/Oracle[1][2] would suggest to also define the PARTITIONS num, and the > partitions are managed under a "tablenamespace", the partition in which the > record is stored is partition number N, where N = MOD(expr, num), for your > design, which partiton the record would persist ? > > > > [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html > > [2] > https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 > > > > Best, > > Danny Chan > > 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <[hidden email]>,写道: > >> Hi Jark, > >> Ad. 2 I added a section to discuss relation to FLIP-63 > >> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. > Therefore you have the key.format.type. > >> I also considered exactly what you are suggesting (prefixing with > connector or kafka). I should've put that into an Option/Rejected > alternatives. > >> I agree timestamp, key.*, value.* are connector properties. Why I > wanted to suggest not adding that prefix in the first version is that > actually all the properties in the WITH section are connector properties. > Even format is in the end a connector property as some of the sources might > not have a format, imo. The benefit of not adding the prefix is that it > makes the keys a bit shorter. Imagine prefixing all the properties with > connector (or if we go with FLINK-12557: elasticsearch): > >> elasticsearch.key.format.type: csv > >> elasticsearch.key.format.field: .... > >> elasticsearch.key.format.delimiter: .... > >> elasticsearch.key.format.*: .... > >> I am fine with doing it though if this is a preferred approach in the > community. > >> Ad in-line comments: > >> I forgot to update the `value.fields.include` property. It should be > value.fields-include. Which I think you also suggested in the comment, > right? > >> As for the cast vs declaring output type of computed column. I think > it's better not to use CAST, but declare a type of an expression and later > on infer the output type of SYSTEM_METADATA. The reason is I think this way > it will be easier to implement e.g. filter push downs when working with the > native types of the source, e.g. in case of Kafka's offset, i think it's > better to pushdown long rather than string. This could let us push > expression like e.g. offset > 12345 & offset < 59382. Otherwise we would > have to push down cast(offset, long) > 12345 && cast(offset, long) < 59382. > Moreover I think we need to introduce the type for computed columns anyway > to support functions that infer output type based on expected return type. > >> As for the computed column push down. Yes, SYSTEM_METADATA would have > to be pushed down to the source. If it is not possible the planner should > fail. As far as I know computed columns push down will be part of source > rework, won't it? ;) > >> As for the persisted computed column. I think it is completely > orthogonal. In my current proposal you can also partition by a computed > column. The difference between using a udf in partitioned by vs partitioned > by a computed column is that when you partition by a computed column this > column must be also computed when reading the table. If you use a udf in > the partitioned by, the expression is computed only when inserting into the > table. > >> Hope this answers some of your questions. Looking forward for further > suggestions. > >> Best, > >> Dawid > >> > >> > >> On 02/03/2020 05:18, Jark Wu wrote: > >>> Hi, > >>> > >>> Thanks Dawid for starting such a great discussion. Reaing metadata and > >>> key-part information from source is an important feature for streaming > >>> users. > >>> > >>> In general, I agree with the proposal of the FLIP. > >>> I will leave my thoughts and comments here: > >>> > >>> 1) +1 to use connector properties instead of introducing HEADER > keyword as > >>> the reason you mentioned in the FLIP. > >>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should > add a > >>> section to explain what's the relationship between them. > >>> Do their concepts conflict? Could INSERT PARTITION be used on the > >>> PARTITIONED table in this FLIP? > >>> 3) Currently, properties are hierarchical in Flink SQL. Shall we make > the > >>> new introduced properties more hierarchical? > >>> For example, "timestamp" => "connector.timestamp"? (actually, I > prefer > >>> "kafka.timestamp" which is another improvement for properties > FLINK-12557) > >>> A single "timestamp" in properties may mislead users that the field > is > >>> a rowtime attribute. > >>> > >>> I also left some minor comments in the FLIP. > >>> > >>> Thanks, > >>> Jark > >>> > >>> > >>> > >>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <[hidden email]> > >>> wrote: > >>> > >>>> Hi, > >>>> > >>>> I would like to propose an improvement that would enable reading table > >>>> columns from different parts of source records. Besides the main > payload > >>>> majority (if not all of the sources) expose additional information. It > >>>> can be simply a read-only metadata such as offset, ingestion time or a > >>>> read and write parts of the record that contain data but additionally > >>>> serve different purposes (partitioning, compaction etc.), e.g. key or > >>>> timestamp in Kafka. > >>>> > >>>> We should make it possible to read and write data from all of those > >>>> locations. In this proposal I discuss reading partitioning data, for > >>>> completeness this proposal discusses also the partitioning when > writing > >>>> data out. > >>>> > >>>> I am looking forward to your comments. > >>>> > >>>> You can access the FLIP here: > >>>> > >>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > >>>> > >>>> Best, > >>>> > >>>> Dawid > >>>> > >>>> > >>>> > > |
Hi Dawid,
I have a couple of questions around key fields, actually I also have some other questions but want to be focused on key fields first. 1. I don't fully understand the usage of "key.fields". Is this option only valid during write operation? Because for reading, I can't imagine how such options can be applied. I would expect that there might be a SYSTEM_METADATA("key") to read and assign the key to a normal field? 2. If "key.fields" is only valid in write operation, I want to propose we can simplify the options to not introducing key.format.type and other related options. I think a single "key.field" (not fields) would be enough, users can use UDF to calculate whatever key they want before sink. 3. Also I don't want to introduce "value.format.type" and "value.format.xxx" with the "value" prefix. Not every connector has a concept of key and values. The old parameter "format.type" already good enough to use. Best, Kurt On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <[hidden email]> wrote: > Thanks Dawid, > > I have two more questions. > > > SupportsMetadata > Introducing SupportsMetadata sounds good to me. But I have some questions > regarding to this interface. > 1) How do the source know what the expected return type of each metadata? > 2) Where to put the metadata fields? Append to the existing physical > fields? > If yes, I would suggest to change the signature to `TableSource > appendMetadataFields(String[] metadataNames, DataType[] metadataTypes)` > > > SYSTEM_METADATA("partition") > Can SYSTEM_METADATA() function be used nested in a computed column > expression? If yes, how to specify the return type of SYSTEM_METADATA? > > Best, > Jark > > On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <[hidden email]> > wrote: > > > Hi, > > > > 1. I thought a bit more on how the source would emit the columns and I > > now see its not exactly the same as regular columns. I see a need to > > elaborate a bit more on that in the FLIP as you asked, Jark. > > > > I do agree mostly with Danny on how we should do that. One additional > > things I would introduce is an > > > > interface SupportsMetadata { > > > > boolean supportsMetadata(Set<String> metadataFields); > > > > TableSource generateMetadataFields(Set<String> metadataFields); > > > > } > > > > This way the source would have to declare/emit only the requested > > metadata fields. In order not to clash with user defined fields. When > > emitting the metadata field I would prepend the column name with > > __system_{property_name}. Therefore when requested > > SYSTEM_METADATA("partition") the source would append a field > > __system_partition to the schema. This would be never visible to the > > user as it would be used only for the subsequent computed columns. If > > that makes sense to you, I will update the FLIP with this description. > > > > 2. CAST vs explicit type in computed columns > > > > Here I agree with Danny. It is also the current state of the proposal. > > > > 3. Partitioning on computed column vs function > > > > Here I also agree with Danny. I also think those are orthogonal. I would > > leave out the STORED computed columns out of the discussion. I don't see > > how do they relate to the partitioning. I already put both of those > > cases in the document. We can either partition on a computed column or > > use a udf in a partioned by clause. I am fine with leaving out the > > partitioning by udf in the first version if you still have some concerns. > > > > As for your question Danny. It depends which partitioning strategy you > use. > > > > For the HASH partitioning strategy I thought it would work as you > > explained. It would be N = MOD(expr, num). I am not sure though if we > > should introduce the PARTITIONS clause. Usually Flink does not own the > > data and the partitions are already an intrinsic property of the > > underlying source e.g. for kafka we do not create topics, but we just > > describe pre-existing pre-partitioned topic. > > > > 4. timestamp vs timestamp.field vs connector.field vs ... > > > > I am fine with changing it to timestamp.field to be consistent with > > other value.fields and key.fields. Actually that was also my initial > > proposal in a first draft I prepared. I changed it afterwards to shorten > > the key. > > > > Best, > > > > Dawid > > > > On 03/03/2020 09:00, Danny Chan wrote: > > > Thanks Dawid for bringing up this discussion, I think it is a useful > > feature ~ > > > > > > About how the metadata outputs from source > > > > > > I think it is completely orthogonal, computed column push down is > > another topic, this should not be a blocker but a promotion, if we do not > > have any filters on the computed column, there is no need to do any > > pushings; the source node just emit the complete record with full > metadata > > with the declared physical schema, then when generating the virtual > > columns, we would extract the metadata info and output as full > columns(with > > full schema). > > > > > > About the type of metadata column > > > > > > Personally i prefer explicit type instead of CAST, they are symantic > > equivalent though, explict type is more straight-forward and we can > declare > > the nullable attribute there. > > > > > > About option A: partitioning based on acomputed column VS option B: > > partitioning with just a function > > > > > > From the FLIP, it seems that B's partitioning is just a strategy when > > writing data, the partiton column is not included in the table schema, so > > it's just useless when reading from that. > > > > > > - Compared to A, we do not need to generate the partition column when > > selecting from the table(but insert into) > > > - For A we can also mark the column as STORED when we want to persist > > that > > > > > > So in my opition they are orthogonal, we can support both, i saw that > > MySQL/Oracle[1][2] would suggest to also define the PARTITIONS num, and > the > > partitions are managed under a "tablenamespace", the partition in which > the > > record is stored is partition number N, where N = MOD(expr, num), for > your > > design, which partiton the record would persist ? > > > > > > [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html > > > [2] > > > https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 > > > > > > Best, > > > Danny Chan > > > 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <[hidden email]>,写道: > > >> Hi Jark, > > >> Ad. 2 I added a section to discuss relation to FLIP-63 > > >> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. > > Therefore you have the key.format.type. > > >> I also considered exactly what you are suggesting (prefixing with > > connector or kafka). I should've put that into an Option/Rejected > > alternatives. > > >> I agree timestamp, key.*, value.* are connector properties. Why I > > wanted to suggest not adding that prefix in the first version is that > > actually all the properties in the WITH section are connector properties. > > Even format is in the end a connector property as some of the sources > might > > not have a format, imo. The benefit of not adding the prefix is that it > > makes the keys a bit shorter. Imagine prefixing all the properties with > > connector (or if we go with FLINK-12557: elasticsearch): > > >> elasticsearch.key.format.type: csv > > >> elasticsearch.key.format.field: .... > > >> elasticsearch.key.format.delimiter: .... > > >> elasticsearch.key.format.*: .... > > >> I am fine with doing it though if this is a preferred approach in the > > community. > > >> Ad in-line comments: > > >> I forgot to update the `value.fields.include` property. It should be > > value.fields-include. Which I think you also suggested in the comment, > > right? > > >> As for the cast vs declaring output type of computed column. I think > > it's better not to use CAST, but declare a type of an expression and > later > > on infer the output type of SYSTEM_METADATA. The reason is I think this > way > > it will be easier to implement e.g. filter push downs when working with > the > > native types of the source, e.g. in case of Kafka's offset, i think it's > > better to pushdown long rather than string. This could let us push > > expression like e.g. offset > 12345 & offset < 59382. Otherwise we would > > have to push down cast(offset, long) > 12345 && cast(offset, long) < > 59382. > > Moreover I think we need to introduce the type for computed columns > anyway > > to support functions that infer output type based on expected return > type. > > >> As for the computed column push down. Yes, SYSTEM_METADATA would have > > to be pushed down to the source. If it is not possible the planner should > > fail. As far as I know computed columns push down will be part of source > > rework, won't it? ;) > > >> As for the persisted computed column. I think it is completely > > orthogonal. In my current proposal you can also partition by a computed > > column. The difference between using a udf in partitioned by vs > partitioned > > by a computed column is that when you partition by a computed column this > > column must be also computed when reading the table. If you use a udf in > > the partitioned by, the expression is computed only when inserting into > the > > table. > > >> Hope this answers some of your questions. Looking forward for further > > suggestions. > > >> Best, > > >> Dawid > > >> > > >> > > >> On 02/03/2020 05:18, Jark Wu wrote: > > >>> Hi, > > >>> > > >>> Thanks Dawid for starting such a great discussion. Reaing metadata > and > > >>> key-part information from source is an important feature for > streaming > > >>> users. > > >>> > > >>> In general, I agree with the proposal of the FLIP. > > >>> I will leave my thoughts and comments here: > > >>> > > >>> 1) +1 to use connector properties instead of introducing HEADER > > keyword as > > >>> the reason you mentioned in the FLIP. > > >>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should > > add a > > >>> section to explain what's the relationship between them. > > >>> Do their concepts conflict? Could INSERT PARTITION be used on the > > >>> PARTITIONED table in this FLIP? > > >>> 3) Currently, properties are hierarchical in Flink SQL. Shall we make > > the > > >>> new introduced properties more hierarchical? > > >>> For example, "timestamp" => "connector.timestamp"? (actually, I > > prefer > > >>> "kafka.timestamp" which is another improvement for properties > > FLINK-12557) > > >>> A single "timestamp" in properties may mislead users that the > field > > is > > >>> a rowtime attribute. > > >>> > > >>> I also left some minor comments in the FLIP. > > >>> > > >>> Thanks, > > >>> Jark > > >>> > > >>> > > >>> > > >>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < > [hidden email]> > > >>> wrote: > > >>> > > >>>> Hi, > > >>>> > > >>>> I would like to propose an improvement that would enable reading > table > > >>>> columns from different parts of source records. Besides the main > > payload > > >>>> majority (if not all of the sources) expose additional information. > It > > >>>> can be simply a read-only metadata such as offset, ingestion time > or a > > >>>> read and write parts of the record that contain data but > additionally > > >>>> serve different purposes (partitioning, compaction etc.), e.g. key > or > > >>>> timestamp in Kafka. > > >>>> > > >>>> We should make it possible to read and write data from all of those > > >>>> locations. In this proposal I discuss reading partitioning data, for > > >>>> completeness this proposal discusses also the partitioning when > > writing > > >>>> data out. > > >>>> > > >>>> I am looking forward to your comments. > > >>>> > > >>>> You can access the FLIP here: > > >>>> > > >>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > > >>>> > > >>>> Best, > > >>>> > > >>>> Dawid > > >>>> > > >>>> > > >>>> > > > > > |
Sorry, forgot one question.
4. Can we make the value.fields-include more orthogonal? Like one can specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not config to just ignore timestamp but keep key. Best, Kurt On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <[hidden email]> wrote: > Hi Dawid, > > I have a couple of questions around key fields, actually I also have some > other questions but want to be focused on key fields first. > > 1. I don't fully understand the usage of "key.fields". Is this option only > valid during write operation? Because for > reading, I can't imagine how such options can be applied. I would expect > that there might be a SYSTEM_METADATA("key") > to read and assign the key to a normal field? > > 2. If "key.fields" is only valid in write operation, I want to propose we > can simplify the options to not introducing key.format.type and > other related options. I think a single "key.field" (not fields) would be > enough, users can use UDF to calculate whatever key they > want before sink. > > 3. Also I don't want to introduce "value.format.type" and > "value.format.xxx" with the "value" prefix. Not every connector has a > concept > of key and values. The old parameter "format.type" already good enough to > use. > > Best, > Kurt > > > On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <[hidden email]> wrote: > >> Thanks Dawid, >> >> I have two more questions. >> >> > SupportsMetadata >> Introducing SupportsMetadata sounds good to me. But I have some questions >> regarding to this interface. >> 1) How do the source know what the expected return type of each metadata? >> 2) Where to put the metadata fields? Append to the existing physical >> fields? >> If yes, I would suggest to change the signature to `TableSource >> appendMetadataFields(String[] metadataNames, DataType[] metadataTypes)` >> >> > SYSTEM_METADATA("partition") >> Can SYSTEM_METADATA() function be used nested in a computed column >> expression? If yes, how to specify the return type of SYSTEM_METADATA? >> >> Best, >> Jark >> >> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <[hidden email]> >> wrote: >> >> > Hi, >> > >> > 1. I thought a bit more on how the source would emit the columns and I >> > now see its not exactly the same as regular columns. I see a need to >> > elaborate a bit more on that in the FLIP as you asked, Jark. >> > >> > I do agree mostly with Danny on how we should do that. One additional >> > things I would introduce is an >> > >> > interface SupportsMetadata { >> > >> > boolean supportsMetadata(Set<String> metadataFields); >> > >> > TableSource generateMetadataFields(Set<String> metadataFields); >> > >> > } >> > >> > This way the source would have to declare/emit only the requested >> > metadata fields. In order not to clash with user defined fields. When >> > emitting the metadata field I would prepend the column name with >> > __system_{property_name}. Therefore when requested >> > SYSTEM_METADATA("partition") the source would append a field >> > __system_partition to the schema. This would be never visible to the >> > user as it would be used only for the subsequent computed columns. If >> > that makes sense to you, I will update the FLIP with this description. >> > >> > 2. CAST vs explicit type in computed columns >> > >> > Here I agree with Danny. It is also the current state of the proposal. >> > >> > 3. Partitioning on computed column vs function >> > >> > Here I also agree with Danny. I also think those are orthogonal. I would >> > leave out the STORED computed columns out of the discussion. I don't see >> > how do they relate to the partitioning. I already put both of those >> > cases in the document. We can either partition on a computed column or >> > use a udf in a partioned by clause. I am fine with leaving out the >> > partitioning by udf in the first version if you still have some >> concerns. >> > >> > As for your question Danny. It depends which partitioning strategy you >> use. >> > >> > For the HASH partitioning strategy I thought it would work as you >> > explained. It would be N = MOD(expr, num). I am not sure though if we >> > should introduce the PARTITIONS clause. Usually Flink does not own the >> > data and the partitions are already an intrinsic property of the >> > underlying source e.g. for kafka we do not create topics, but we just >> > describe pre-existing pre-partitioned topic. >> > >> > 4. timestamp vs timestamp.field vs connector.field vs ... >> > >> > I am fine with changing it to timestamp.field to be consistent with >> > other value.fields and key.fields. Actually that was also my initial >> > proposal in a first draft I prepared. I changed it afterwards to shorten >> > the key. >> > >> > Best, >> > >> > Dawid >> > >> > On 03/03/2020 09:00, Danny Chan wrote: >> > > Thanks Dawid for bringing up this discussion, I think it is a useful >> > feature ~ >> > > >> > > About how the metadata outputs from source >> > > >> > > I think it is completely orthogonal, computed column push down is >> > another topic, this should not be a blocker but a promotion, if we do >> not >> > have any filters on the computed column, there is no need to do any >> > pushings; the source node just emit the complete record with full >> metadata >> > with the declared physical schema, then when generating the virtual >> > columns, we would extract the metadata info and output as full >> columns(with >> > full schema). >> > > >> > > About the type of metadata column >> > > >> > > Personally i prefer explicit type instead of CAST, they are symantic >> > equivalent though, explict type is more straight-forward and we can >> declare >> > the nullable attribute there. >> > > >> > > About option A: partitioning based on acomputed column VS option B: >> > partitioning with just a function >> > > >> > > From the FLIP, it seems that B's partitioning is just a strategy when >> > writing data, the partiton column is not included in the table schema, >> so >> > it's just useless when reading from that. >> > > >> > > - Compared to A, we do not need to generate the partition column when >> > selecting from the table(but insert into) >> > > - For A we can also mark the column as STORED when we want to persist >> > that >> > > >> > > So in my opition they are orthogonal, we can support both, i saw that >> > MySQL/Oracle[1][2] would suggest to also define the PARTITIONS num, and >> the >> > partitions are managed under a "tablenamespace", the partition in which >> the >> > record is stored is partition number N, where N = MOD(expr, num), for >> your >> > design, which partiton the record would persist ? >> > > >> > > [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html >> > > [2] >> > >> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 >> > > >> > > Best, >> > > Danny Chan >> > > 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <[hidden email] >> >,写道: >> > >> Hi Jark, >> > >> Ad. 2 I added a section to discuss relation to FLIP-63 >> > >> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. >> > Therefore you have the key.format.type. >> > >> I also considered exactly what you are suggesting (prefixing with >> > connector or kafka). I should've put that into an Option/Rejected >> > alternatives. >> > >> I agree timestamp, key.*, value.* are connector properties. Why I >> > wanted to suggest not adding that prefix in the first version is that >> > actually all the properties in the WITH section are connector >> properties. >> > Even format is in the end a connector property as some of the sources >> might >> > not have a format, imo. The benefit of not adding the prefix is that it >> > makes the keys a bit shorter. Imagine prefixing all the properties with >> > connector (or if we go with FLINK-12557: elasticsearch): >> > >> elasticsearch.key.format.type: csv >> > >> elasticsearch.key.format.field: .... >> > >> elasticsearch.key.format.delimiter: .... >> > >> elasticsearch.key.format.*: .... >> > >> I am fine with doing it though if this is a preferred approach in the >> > community. >> > >> Ad in-line comments: >> > >> I forgot to update the `value.fields.include` property. It should be >> > value.fields-include. Which I think you also suggested in the comment, >> > right? >> > >> As for the cast vs declaring output type of computed column. I think >> > it's better not to use CAST, but declare a type of an expression and >> later >> > on infer the output type of SYSTEM_METADATA. The reason is I think this >> way >> > it will be easier to implement e.g. filter push downs when working with >> the >> > native types of the source, e.g. in case of Kafka's offset, i think it's >> > better to pushdown long rather than string. This could let us push >> > expression like e.g. offset > 12345 & offset < 59382. Otherwise we would >> > have to push down cast(offset, long) > 12345 && cast(offset, long) < >> 59382. >> > Moreover I think we need to introduce the type for computed columns >> anyway >> > to support functions that infer output type based on expected return >> type. >> > >> As for the computed column push down. Yes, SYSTEM_METADATA would have >> > to be pushed down to the source. If it is not possible the planner >> should >> > fail. As far as I know computed columns push down will be part of source >> > rework, won't it? ;) >> > >> As for the persisted computed column. I think it is completely >> > orthogonal. In my current proposal you can also partition by a computed >> > column. The difference between using a udf in partitioned by vs >> partitioned >> > by a computed column is that when you partition by a computed column >> this >> > column must be also computed when reading the table. If you use a udf in >> > the partitioned by, the expression is computed only when inserting into >> the >> > table. >> > >> Hope this answers some of your questions. Looking forward for further >> > suggestions. >> > >> Best, >> > >> Dawid >> > >> >> > >> >> > >> On 02/03/2020 05:18, Jark Wu wrote: >> > >>> Hi, >> > >>> >> > >>> Thanks Dawid for starting such a great discussion. Reaing metadata >> and >> > >>> key-part information from source is an important feature for >> streaming >> > >>> users. >> > >>> >> > >>> In general, I agree with the proposal of the FLIP. >> > >>> I will leave my thoughts and comments here: >> > >>> >> > >>> 1) +1 to use connector properties instead of introducing HEADER >> > keyword as >> > >>> the reason you mentioned in the FLIP. >> > >>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should >> > add a >> > >>> section to explain what's the relationship between them. >> > >>> Do their concepts conflict? Could INSERT PARTITION be used on the >> > >>> PARTITIONED table in this FLIP? >> > >>> 3) Currently, properties are hierarchical in Flink SQL. Shall we >> make >> > the >> > >>> new introduced properties more hierarchical? >> > >>> For example, "timestamp" => "connector.timestamp"? (actually, I >> > prefer >> > >>> "kafka.timestamp" which is another improvement for properties >> > FLINK-12557) >> > >>> A single "timestamp" in properties may mislead users that the >> field >> > is >> > >>> a rowtime attribute. >> > >>> >> > >>> I also left some minor comments in the FLIP. >> > >>> >> > >>> Thanks, >> > >>> Jark >> > >>> >> > >>> >> > >>> >> > >>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < >> [hidden email]> >> > >>> wrote: >> > >>> >> > >>>> Hi, >> > >>>> >> > >>>> I would like to propose an improvement that would enable reading >> table >> > >>>> columns from different parts of source records. Besides the main >> > payload >> > >>>> majority (if not all of the sources) expose additional >> information. It >> > >>>> can be simply a read-only metadata such as offset, ingestion time >> or a >> > >>>> read and write parts of the record that contain data but >> additionally >> > >>>> serve different purposes (partitioning, compaction etc.), e.g. key >> or >> > >>>> timestamp in Kafka. >> > >>>> >> > >>>> We should make it possible to read and write data from all of those >> > >>>> locations. In this proposal I discuss reading partitioning data, >> for >> > >>>> completeness this proposal discusses also the partitioning when >> > writing >> > >>>> data out. >> > >>>> >> > >>>> I am looking forward to your comments. >> > >>>> >> > >>>> You can access the FLIP here: >> > >>>> >> > >>>> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode >> > >>>> >> > >>>> Best, >> > >>>> >> > >>>> Dawid >> > >>>> >> > >>>> >> > >>>> >> > >> > >> > |
Hi everyone,
I completely reworked FLIP-107. It now covers the full story how to read and write metadata from/to connectors and formats. It considers all of the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It introduces the concept of PERSISTED computed columns and leaves out partitioning for now. Looking forward to your feedback. Regards, Timo On 04.03.20 09:45, Kurt Young wrote: > Sorry, forgot one question. > > 4. Can we make the value.fields-include more orthogonal? Like one can > specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". > With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not config to > just ignore timestamp but keep key. > > Best, > Kurt > > > On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <[hidden email]> wrote: > >> Hi Dawid, >> >> I have a couple of questions around key fields, actually I also have some >> other questions but want to be focused on key fields first. >> >> 1. I don't fully understand the usage of "key.fields". Is this option only >> valid during write operation? Because for >> reading, I can't imagine how such options can be applied. I would expect >> that there might be a SYSTEM_METADATA("key") >> to read and assign the key to a normal field? >> >> 2. If "key.fields" is only valid in write operation, I want to propose we >> can simplify the options to not introducing key.format.type and >> other related options. I think a single "key.field" (not fields) would be >> enough, users can use UDF to calculate whatever key they >> want before sink. >> >> 3. Also I don't want to introduce "value.format.type" and >> "value.format.xxx" with the "value" prefix. Not every connector has a >> concept >> of key and values. The old parameter "format.type" already good enough to >> use. >> >> Best, >> Kurt >> >> >> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <[hidden email]> wrote: >> >>> Thanks Dawid, >>> >>> I have two more questions. >>> >>>> SupportsMetadata >>> Introducing SupportsMetadata sounds good to me. But I have some questions >>> regarding to this interface. >>> 1) How do the source know what the expected return type of each metadata? >>> 2) Where to put the metadata fields? Append to the existing physical >>> fields? >>> If yes, I would suggest to change the signature to `TableSource >>> appendMetadataFields(String[] metadataNames, DataType[] metadataTypes)` >>> >>>> SYSTEM_METADATA("partition") >>> Can SYSTEM_METADATA() function be used nested in a computed column >>> expression? If yes, how to specify the return type of SYSTEM_METADATA? >>> >>> Best, >>> Jark >>> >>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <[hidden email]> >>> wrote: >>> >>>> Hi, >>>> >>>> 1. I thought a bit more on how the source would emit the columns and I >>>> now see its not exactly the same as regular columns. I see a need to >>>> elaborate a bit more on that in the FLIP as you asked, Jark. >>>> >>>> I do agree mostly with Danny on how we should do that. One additional >>>> things I would introduce is an >>>> >>>> interface SupportsMetadata { >>>> >>>> boolean supportsMetadata(Set<String> metadataFields); >>>> >>>> TableSource generateMetadataFields(Set<String> metadataFields); >>>> >>>> } >>>> >>>> This way the source would have to declare/emit only the requested >>>> metadata fields. In order not to clash with user defined fields. When >>>> emitting the metadata field I would prepend the column name with >>>> __system_{property_name}. Therefore when requested >>>> SYSTEM_METADATA("partition") the source would append a field >>>> __system_partition to the schema. This would be never visible to the >>>> user as it would be used only for the subsequent computed columns. If >>>> that makes sense to you, I will update the FLIP with this description. >>>> >>>> 2. CAST vs explicit type in computed columns >>>> >>>> Here I agree with Danny. It is also the current state of the proposal. >>>> >>>> 3. Partitioning on computed column vs function >>>> >>>> Here I also agree with Danny. I also think those are orthogonal. I would >>>> leave out the STORED computed columns out of the discussion. I don't see >>>> how do they relate to the partitioning. I already put both of those >>>> cases in the document. We can either partition on a computed column or >>>> use a udf in a partioned by clause. I am fine with leaving out the >>>> partitioning by udf in the first version if you still have some >>> concerns. >>>> >>>> As for your question Danny. It depends which partitioning strategy you >>> use. >>>> >>>> For the HASH partitioning strategy I thought it would work as you >>>> explained. It would be N = MOD(expr, num). I am not sure though if we >>>> should introduce the PARTITIONS clause. Usually Flink does not own the >>>> data and the partitions are already an intrinsic property of the >>>> underlying source e.g. for kafka we do not create topics, but we just >>>> describe pre-existing pre-partitioned topic. >>>> >>>> 4. timestamp vs timestamp.field vs connector.field vs ... >>>> >>>> I am fine with changing it to timestamp.field to be consistent with >>>> other value.fields and key.fields. Actually that was also my initial >>>> proposal in a first draft I prepared. I changed it afterwards to shorten >>>> the key. >>>> >>>> Best, >>>> >>>> Dawid >>>> >>>> On 03/03/2020 09:00, Danny Chan wrote: >>>>> Thanks Dawid for bringing up this discussion, I think it is a useful >>>> feature ~ >>>>> >>>>> About how the metadata outputs from source >>>>> >>>>> I think it is completely orthogonal, computed column push down is >>>> another topic, this should not be a blocker but a promotion, if we do >>> not >>>> have any filters on the computed column, there is no need to do any >>>> pushings; the source node just emit the complete record with full >>> metadata >>>> with the declared physical schema, then when generating the virtual >>>> columns, we would extract the metadata info and output as full >>> columns(with >>>> full schema). >>>>> >>>>> About the type of metadata column >>>>> >>>>> Personally i prefer explicit type instead of CAST, they are symantic >>>> equivalent though, explict type is more straight-forward and we can >>> declare >>>> the nullable attribute there. >>>>> >>>>> About option A: partitioning based on acomputed column VS option B: >>>> partitioning with just a function >>>>> >>>>> From the FLIP, it seems that B's partitioning is just a strategy when >>>> writing data, the partiton column is not included in the table schema, >>> so >>>> it's just useless when reading from that. >>>>> >>>>> - Compared to A, we do not need to generate the partition column when >>>> selecting from the table(but insert into) >>>>> - For A we can also mark the column as STORED when we want to persist >>>> that >>>>> >>>>> So in my opition they are orthogonal, we can support both, i saw that >>>> MySQL/Oracle[1][2] would suggest to also define the PARTITIONS num, and >>> the >>>> partitions are managed under a "tablenamespace", the partition in which >>> the >>>> record is stored is partition number N, where N = MOD(expr, num), for >>> your >>>> design, which partiton the record would persist ? >>>>> >>>>> [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html >>>>> [2] >>>> >>> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 >>>>> >>>>> Best, >>>>> Danny Chan >>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <[hidden email] >>>> ,写道: >>>>>> Hi Jark, >>>>>> Ad. 2 I added a section to discuss relation to FLIP-63 >>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. >>>> Therefore you have the key.format.type. >>>>>> I also considered exactly what you are suggesting (prefixing with >>>> connector or kafka). I should've put that into an Option/Rejected >>>> alternatives. >>>>>> I agree timestamp, key.*, value.* are connector properties. Why I >>>> wanted to suggest not adding that prefix in the first version is that >>>> actually all the properties in the WITH section are connector >>> properties. >>>> Even format is in the end a connector property as some of the sources >>> might >>>> not have a format, imo. The benefit of not adding the prefix is that it >>>> makes the keys a bit shorter. Imagine prefixing all the properties with >>>> connector (or if we go with FLINK-12557: elasticsearch): >>>>>> elasticsearch.key.format.type: csv >>>>>> elasticsearch.key.format.field: .... >>>>>> elasticsearch.key.format.delimiter: .... >>>>>> elasticsearch.key.format.*: .... >>>>>> I am fine with doing it though if this is a preferred approach in the >>>> community. >>>>>> Ad in-line comments: >>>>>> I forgot to update the `value.fields.include` property. It should be >>>> value.fields-include. Which I think you also suggested in the comment, >>>> right? >>>>>> As for the cast vs declaring output type of computed column. I think >>>> it's better not to use CAST, but declare a type of an expression and >>> later >>>> on infer the output type of SYSTEM_METADATA. The reason is I think this >>> way >>>> it will be easier to implement e.g. filter push downs when working with >>> the >>>> native types of the source, e.g. in case of Kafka's offset, i think it's >>>> better to pushdown long rather than string. This could let us push >>>> expression like e.g. offset > 12345 & offset < 59382. Otherwise we would >>>> have to push down cast(offset, long) > 12345 && cast(offset, long) < >>> 59382. >>>> Moreover I think we need to introduce the type for computed columns >>> anyway >>>> to support functions that infer output type based on expected return >>> type. >>>>>> As for the computed column push down. Yes, SYSTEM_METADATA would have >>>> to be pushed down to the source. If it is not possible the planner >>> should >>>> fail. As far as I know computed columns push down will be part of source >>>> rework, won't it? ;) >>>>>> As for the persisted computed column. I think it is completely >>>> orthogonal. In my current proposal you can also partition by a computed >>>> column. The difference between using a udf in partitioned by vs >>> partitioned >>>> by a computed column is that when you partition by a computed column >>> this >>>> column must be also computed when reading the table. If you use a udf in >>>> the partitioned by, the expression is computed only when inserting into >>> the >>>> table. >>>>>> Hope this answers some of your questions. Looking forward for further >>>> suggestions. >>>>>> Best, >>>>>> Dawid >>>>>> >>>>>> >>>>>> On 02/03/2020 05:18, Jark Wu wrote: >>>>>>> Hi, >>>>>>> >>>>>>> Thanks Dawid for starting such a great discussion. Reaing metadata >>> and >>>>>>> key-part information from source is an important feature for >>> streaming >>>>>>> users. >>>>>>> >>>>>>> In general, I agree with the proposal of the FLIP. >>>>>>> I will leave my thoughts and comments here: >>>>>>> >>>>>>> 1) +1 to use connector properties instead of introducing HEADER >>>> keyword as >>>>>>> the reason you mentioned in the FLIP. >>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we should >>>> add a >>>>>>> section to explain what's the relationship between them. >>>>>>> Do their concepts conflict? Could INSERT PARTITION be used on the >>>>>>> PARTITIONED table in this FLIP? >>>>>>> 3) Currently, properties are hierarchical in Flink SQL. Shall we >>> make >>>> the >>>>>>> new introduced properties more hierarchical? >>>>>>> For example, "timestamp" => "connector.timestamp"? (actually, I >>>> prefer >>>>>>> "kafka.timestamp" which is another improvement for properties >>>> FLINK-12557) >>>>>>> A single "timestamp" in properties may mislead users that the >>> field >>>> is >>>>>>> a rowtime attribute. >>>>>>> >>>>>>> I also left some minor comments in the FLIP. >>>>>>> >>>>>>> Thanks, >>>>>>> Jark >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < >>> [hidden email]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I would like to propose an improvement that would enable reading >>> table >>>>>>>> columns from different parts of source records. Besides the main >>>> payload >>>>>>>> majority (if not all of the sources) expose additional >>> information. It >>>>>>>> can be simply a read-only metadata such as offset, ingestion time >>> or a >>>>>>>> read and write parts of the record that contain data but >>> additionally >>>>>>>> serve different purposes (partitioning, compaction etc.), e.g. key >>> or >>>>>>>> timestamp in Kafka. >>>>>>>> >>>>>>>> We should make it possible to read and write data from all of those >>>>>>>> locations. In this proposal I discuss reading partitioning data, >>> for >>>>>>>> completeness this proposal discusses also the partitioning when >>>> writing >>>>>>>> data out. >>>>>>>> >>>>>>>> I am looking forward to your comments. >>>>>>>> >>>>>>>> You can access the FLIP here: >>>>>>>> >>>>>>>> >>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Dawid >>>>>>>> >>>>>>>> >>>>>>>> >>>> >>>> >>> >> > |
Hi Timo,
Thank you very much for the update. It indeed covers the full story in more details. I agree with the proposal. On 04/09/2020 10:48, Timo Walther wrote: > Hi everyone, > > I completely reworked FLIP-107. It now covers the full story how to > read and write metadata from/to connectors and formats. It considers > all of the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It > introduces the concept of PERSISTED computed columns and leaves out > partitioning for now. > > Looking forward to your feedback. > > Regards, > Timo > > > On 04.03.20 09:45, Kurt Young wrote: >> Sorry, forgot one question. >> >> 4. Can we make the value.fields-include more orthogonal? Like one can >> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". >> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not >> config to >> just ignore timestamp but keep key. >> >> Best, >> Kurt >> >> >> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <[hidden email]> wrote: >> >>> Hi Dawid, >>> >>> I have a couple of questions around key fields, actually I also have >>> some >>> other questions but want to be focused on key fields first. >>> >>> 1. I don't fully understand the usage of "key.fields". Is this >>> option only >>> valid during write operation? Because for >>> reading, I can't imagine how such options can be applied. I would >>> expect >>> that there might be a SYSTEM_METADATA("key") >>> to read and assign the key to a normal field? >>> >>> 2. If "key.fields" is only valid in write operation, I want to >>> propose we >>> can simplify the options to not introducing key.format.type and >>> other related options. I think a single "key.field" (not fields) >>> would be >>> enough, users can use UDF to calculate whatever key they >>> want before sink. >>> >>> 3. Also I don't want to introduce "value.format.type" and >>> "value.format.xxx" with the "value" prefix. Not every connector has a >>> concept >>> of key and values. The old parameter "format.type" already good >>> enough to >>> use. >>> >>> Best, >>> Kurt >>> >>> >>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <[hidden email]> wrote: >>> >>>> Thanks Dawid, >>>> >>>> I have two more questions. >>>> >>>>> SupportsMetadata >>>> Introducing SupportsMetadata sounds good to me. But I have some >>>> questions >>>> regarding to this interface. >>>> 1) How do the source know what the expected return type of each >>>> metadata? >>>> 2) Where to put the metadata fields? Append to the existing physical >>>> fields? >>>> If yes, I would suggest to change the signature to `TableSource >>>> appendMetadataFields(String[] metadataNames, DataType[] >>>> metadataTypes)` >>>> >>>>> SYSTEM_METADATA("partition") >>>> Can SYSTEM_METADATA() function be used nested in a computed column >>>> expression? If yes, how to specify the return type of SYSTEM_METADATA? >>>> >>>> Best, >>>> Jark >>>> >>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <[hidden email]> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> 1. I thought a bit more on how the source would emit the columns >>>>> and I >>>>> now see its not exactly the same as regular columns. I see a need to >>>>> elaborate a bit more on that in the FLIP as you asked, Jark. >>>>> >>>>> I do agree mostly with Danny on how we should do that. One additional >>>>> things I would introduce is an >>>>> >>>>> interface SupportsMetadata { >>>>> >>>>> boolean supportsMetadata(Set<String> metadataFields); >>>>> >>>>> TableSource generateMetadataFields(Set<String> metadataFields); >>>>> >>>>> } >>>>> >>>>> This way the source would have to declare/emit only the requested >>>>> metadata fields. In order not to clash with user defined fields. When >>>>> emitting the metadata field I would prepend the column name with >>>>> __system_{property_name}. Therefore when requested >>>>> SYSTEM_METADATA("partition") the source would append a field >>>>> __system_partition to the schema. This would be never visible to the >>>>> user as it would be used only for the subsequent computed columns. If >>>>> that makes sense to you, I will update the FLIP with this >>>>> description. >>>>> >>>>> 2. CAST vs explicit type in computed columns >>>>> >>>>> Here I agree with Danny. It is also the current state of the >>>>> proposal. >>>>> >>>>> 3. Partitioning on computed column vs function >>>>> >>>>> Here I also agree with Danny. I also think those are orthogonal. I >>>>> would >>>>> leave out the STORED computed columns out of the discussion. I >>>>> don't see >>>>> how do they relate to the partitioning. I already put both of those >>>>> cases in the document. We can either partition on a computed >>>>> column or >>>>> use a udf in a partioned by clause. I am fine with leaving out the >>>>> partitioning by udf in the first version if you still have some >>>> concerns. >>>>> >>>>> As for your question Danny. It depends which partitioning strategy >>>>> you >>>> use. >>>>> >>>>> For the HASH partitioning strategy I thought it would work as you >>>>> explained. It would be N = MOD(expr, num). I am not sure though if we >>>>> should introduce the PARTITIONS clause. Usually Flink does not own >>>>> the >>>>> data and the partitions are already an intrinsic property of the >>>>> underlying source e.g. for kafka we do not create topics, but we just >>>>> describe pre-existing pre-partitioned topic. >>>>> >>>>> 4. timestamp vs timestamp.field vs connector.field vs ... >>>>> >>>>> I am fine with changing it to timestamp.field to be consistent with >>>>> other value.fields and key.fields. Actually that was also my initial >>>>> proposal in a first draft I prepared. I changed it afterwards to >>>>> shorten >>>>> the key. >>>>> >>>>> Best, >>>>> >>>>> Dawid >>>>> >>>>> On 03/03/2020 09:00, Danny Chan wrote: >>>>>> Thanks Dawid for bringing up this discussion, I think it is a useful >>>>> feature ~ >>>>>> >>>>>> About how the metadata outputs from source >>>>>> >>>>>> I think it is completely orthogonal, computed column push down is >>>>> another topic, this should not be a blocker but a promotion, if we do >>>> not >>>>> have any filters on the computed column, there is no need to do any >>>>> pushings; the source node just emit the complete record with full >>>> metadata >>>>> with the declared physical schema, then when generating the virtual >>>>> columns, we would extract the metadata info and output as full >>>> columns(with >>>>> full schema). >>>>>> >>>>>> About the type of metadata column >>>>>> >>>>>> Personally i prefer explicit type instead of CAST, they are symantic >>>>> equivalent though, explict type is more straight-forward and we can >>>> declare >>>>> the nullable attribute there. >>>>>> >>>>>> About option A: partitioning based on acomputed column VS option B: >>>>> partitioning with just a function >>>>>> >>>>>> From the FLIP, it seems that B's partitioning is just a strategy >>>>>> when >>>>> writing data, the partiton column is not included in the table >>>>> schema, >>>> so >>>>> it's just useless when reading from that. >>>>>> >>>>>> - Compared to A, we do not need to generate the partition column >>>>>> when >>>>> selecting from the table(but insert into) >>>>>> - For A we can also mark the column as STORED when we want to >>>>>> persist >>>>> that >>>>>> >>>>>> So in my opition they are orthogonal, we can support both, i saw >>>>>> that >>>>> MySQL/Oracle[1][2] would suggest to also define the PARTITIONS >>>>> num, and >>>> the >>>>> partitions are managed under a "tablenamespace", the partition in >>>>> which >>>> the >>>>> record is stored is partition number N, where N = MOD(expr, num), for >>>> your >>>>> design, which partiton the record would persist ? >>>>>> >>>>>> [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html >>>>>> [2] >>>>> >>>> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 >>>> >>>>>> >>>>>> Best, >>>>>> Danny Chan >>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz >>>>>> <[hidden email] >>>>> ,写道: >>>>>>> Hi Jark, >>>>>>> Ad. 2 I added a section to discuss relation to FLIP-63 >>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. >>>>> Therefore you have the key.format.type. >>>>>>> I also considered exactly what you are suggesting (prefixing with >>>>> connector or kafka). I should've put that into an Option/Rejected >>>>> alternatives. >>>>>>> I agree timestamp, key.*, value.* are connector properties. Why I >>>>> wanted to suggest not adding that prefix in the first version is that >>>>> actually all the properties in the WITH section are connector >>>> properties. >>>>> Even format is in the end a connector property as some of the sources >>>> might >>>>> not have a format, imo. The benefit of not adding the prefix is >>>>> that it >>>>> makes the keys a bit shorter. Imagine prefixing all the properties >>>>> with >>>>> connector (or if we go with FLINK-12557: elasticsearch): >>>>>>> elasticsearch.key.format.type: csv >>>>>>> elasticsearch.key.format.field: .... >>>>>>> elasticsearch.key.format.delimiter: .... >>>>>>> elasticsearch.key.format.*: .... >>>>>>> I am fine with doing it though if this is a preferred approach >>>>>>> in the >>>>> community. >>>>>>> Ad in-line comments: >>>>>>> I forgot to update the `value.fields.include` property. It >>>>>>> should be >>>>> value.fields-include. Which I think you also suggested in the >>>>> comment, >>>>> right? >>>>>>> As for the cast vs declaring output type of computed column. I >>>>>>> think >>>>> it's better not to use CAST, but declare a type of an expression and >>>> later >>>>> on infer the output type of SYSTEM_METADATA. The reason is I think >>>>> this >>>> way >>>>> it will be easier to implement e.g. filter push downs when working >>>>> with >>>> the >>>>> native types of the source, e.g. in case of Kafka's offset, i >>>>> think it's >>>>> better to pushdown long rather than string. This could let us push >>>>> expression like e.g. offset > 12345 & offset < 59382. Otherwise we >>>>> would >>>>> have to push down cast(offset, long) > 12345 && cast(offset, long) < >>>> 59382. >>>>> Moreover I think we need to introduce the type for computed columns >>>> anyway >>>>> to support functions that infer output type based on expected return >>>> type. >>>>>>> As for the computed column push down. Yes, SYSTEM_METADATA would >>>>>>> have >>>>> to be pushed down to the source. If it is not possible the planner >>>> should >>>>> fail. As far as I know computed columns push down will be part of >>>>> source >>>>> rework, won't it? ;) >>>>>>> As for the persisted computed column. I think it is completely >>>>> orthogonal. In my current proposal you can also partition by a >>>>> computed >>>>> column. The difference between using a udf in partitioned by vs >>>> partitioned >>>>> by a computed column is that when you partition by a computed column >>>> this >>>>> column must be also computed when reading the table. If you use a >>>>> udf in >>>>> the partitioned by, the expression is computed only when inserting >>>>> into >>>> the >>>>> table. >>>>>>> Hope this answers some of your questions. Looking forward for >>>>>>> further >>>>> suggestions. >>>>>>> Best, >>>>>>> Dawid >>>>>>> >>>>>>> >>>>>>> On 02/03/2020 05:18, Jark Wu wrote: >>>>>>>> Hi, >>>>>>>> >>>>>>>> Thanks Dawid for starting such a great discussion. Reaing metadata >>>> and >>>>>>>> key-part information from source is an important feature for >>>> streaming >>>>>>>> users. >>>>>>>> >>>>>>>> In general, I agree with the proposal of the FLIP. >>>>>>>> I will leave my thoughts and comments here: >>>>>>>> >>>>>>>> 1) +1 to use connector properties instead of introducing HEADER >>>>> keyword as >>>>>>>> the reason you mentioned in the FLIP. >>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we >>>>>>>> should >>>>> add a >>>>>>>> section to explain what's the relationship between them. >>>>>>>> Do their concepts conflict? Could INSERT PARTITION be used >>>>>>>> on the >>>>>>>> PARTITIONED table in this FLIP? >>>>>>>> 3) Currently, properties are hierarchical in Flink SQL. Shall we >>>> make >>>>> the >>>>>>>> new introduced properties more hierarchical? >>>>>>>> For example, "timestamp" => "connector.timestamp"? >>>>>>>> (actually, I >>>>> prefer >>>>>>>> "kafka.timestamp" which is another improvement for properties >>>>> FLINK-12557) >>>>>>>> A single "timestamp" in properties may mislead users that the >>>> field >>>>> is >>>>>>>> a rowtime attribute. >>>>>>>> >>>>>>>> I also left some minor comments in the FLIP. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Jark >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < >>>> [hidden email]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I would like to propose an improvement that would enable reading >>>> table >>>>>>>>> columns from different parts of source records. Besides the main >>>>> payload >>>>>>>>> majority (if not all of the sources) expose additional >>>> information. It >>>>>>>>> can be simply a read-only metadata such as offset, ingestion time >>>> or a >>>>>>>>> read and write parts of the record that contain data but >>>> additionally >>>>>>>>> serve different purposes (partitioning, compaction etc.), e.g. >>>>>>>>> key >>>> or >>>>>>>>> timestamp in Kafka. >>>>>>>>> >>>>>>>>> We should make it possible to read and write data from all of >>>>>>>>> those >>>>>>>>> locations. In this proposal I discuss reading partitioning data, >>>> for >>>>>>>>> completeness this proposal discusses also the partitioning when >>>>> writing >>>>>>>>> data out. >>>>>>>>> >>>>>>>>> I am looking forward to your comments. >>>>>>>>> >>>>>>>>> You can access the FLIP here: >>>>>>>>> >>>>>>>>> >>>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode >>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> >>>>>>>>> Dawid >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>> >>>>> >>>> >>> >> > signature.asc (849 bytes) Download Attachment |
I like the proposal! I didn't check the implementation section in detail
but the SQL DDL examples look good as well as the options for specifying how fields are mapped to keys/values look good. Aljoscha On 04.09.20 11:47, Dawid Wysakowicz wrote: > Hi Timo, > > Thank you very much for the update. It indeed covers the full story in > more details. I agree with the proposal. > > On 04/09/2020 10:48, Timo Walther wrote: >> Hi everyone, >> >> I completely reworked FLIP-107. It now covers the full story how to >> read and write metadata from/to connectors and formats. It considers >> all of the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It >> introduces the concept of PERSISTED computed columns and leaves out >> partitioning for now. >> >> Looking forward to your feedback. >> >> Regards, >> Timo >> >> >> On 04.03.20 09:45, Kurt Young wrote: >>> Sorry, forgot one question. >>> >>> 4. Can we make the value.fields-include more orthogonal? Like one can >>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". >>> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not >>> config to >>> just ignore timestamp but keep key. >>> >>> Best, >>> Kurt >>> >>> >>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <[hidden email]> wrote: >>> >>>> Hi Dawid, >>>> >>>> I have a couple of questions around key fields, actually I also have >>>> some >>>> other questions but want to be focused on key fields first. >>>> >>>> 1. I don't fully understand the usage of "key.fields". Is this >>>> option only >>>> valid during write operation? Because for >>>> reading, I can't imagine how such options can be applied. I would >>>> expect >>>> that there might be a SYSTEM_METADATA("key") >>>> to read and assign the key to a normal field? >>>> >>>> 2. If "key.fields" is only valid in write operation, I want to >>>> propose we >>>> can simplify the options to not introducing key.format.type and >>>> other related options. I think a single "key.field" (not fields) >>>> would be >>>> enough, users can use UDF to calculate whatever key they >>>> want before sink. >>>> >>>> 3. Also I don't want to introduce "value.format.type" and >>>> "value.format.xxx" with the "value" prefix. Not every connector has a >>>> concept >>>> of key and values. The old parameter "format.type" already good >>>> enough to >>>> use. >>>> >>>> Best, >>>> Kurt >>>> >>>> >>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <[hidden email]> wrote: >>>> >>>>> Thanks Dawid, >>>>> >>>>> I have two more questions. >>>>> >>>>>> SupportsMetadata >>>>> Introducing SupportsMetadata sounds good to me. But I have some >>>>> questions >>>>> regarding to this interface. >>>>> 1) How do the source know what the expected return type of each >>>>> metadata? >>>>> 2) Where to put the metadata fields? Append to the existing physical >>>>> fields? >>>>> If yes, I would suggest to change the signature to `TableSource >>>>> appendMetadataFields(String[] metadataNames, DataType[] >>>>> metadataTypes)` >>>>> >>>>>> SYSTEM_METADATA("partition") >>>>> Can SYSTEM_METADATA() function be used nested in a computed column >>>>> expression? If yes, how to specify the return type of SYSTEM_METADATA? >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <[hidden email]> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> 1. I thought a bit more on how the source would emit the columns >>>>>> and I >>>>>> now see its not exactly the same as regular columns. I see a need to >>>>>> elaborate a bit more on that in the FLIP as you asked, Jark. >>>>>> >>>>>> I do agree mostly with Danny on how we should do that. One additional >>>>>> things I would introduce is an >>>>>> >>>>>> interface SupportsMetadata { >>>>>> >>>>>> boolean supportsMetadata(Set<String> metadataFields); >>>>>> >>>>>> TableSource generateMetadataFields(Set<String> metadataFields); >>>>>> >>>>>> } >>>>>> >>>>>> This way the source would have to declare/emit only the requested >>>>>> metadata fields. In order not to clash with user defined fields. When >>>>>> emitting the metadata field I would prepend the column name with >>>>>> __system_{property_name}. Therefore when requested >>>>>> SYSTEM_METADATA("partition") the source would append a field >>>>>> __system_partition to the schema. This would be never visible to the >>>>>> user as it would be used only for the subsequent computed columns. If >>>>>> that makes sense to you, I will update the FLIP with this >>>>>> description. >>>>>> >>>>>> 2. CAST vs explicit type in computed columns >>>>>> >>>>>> Here I agree with Danny. It is also the current state of the >>>>>> proposal. >>>>>> >>>>>> 3. Partitioning on computed column vs function >>>>>> >>>>>> Here I also agree with Danny. I also think those are orthogonal. I >>>>>> would >>>>>> leave out the STORED computed columns out of the discussion. I >>>>>> don't see >>>>>> how do they relate to the partitioning. I already put both of those >>>>>> cases in the document. We can either partition on a computed >>>>>> column or >>>>>> use a udf in a partioned by clause. I am fine with leaving out the >>>>>> partitioning by udf in the first version if you still have some >>>>> concerns. >>>>>> >>>>>> As for your question Danny. It depends which partitioning strategy >>>>>> you >>>>> use. >>>>>> >>>>>> For the HASH partitioning strategy I thought it would work as you >>>>>> explained. It would be N = MOD(expr, num). I am not sure though if we >>>>>> should introduce the PARTITIONS clause. Usually Flink does not own >>>>>> the >>>>>> data and the partitions are already an intrinsic property of the >>>>>> underlying source e.g. for kafka we do not create topics, but we just >>>>>> describe pre-existing pre-partitioned topic. >>>>>> >>>>>> 4. timestamp vs timestamp.field vs connector.field vs ... >>>>>> >>>>>> I am fine with changing it to timestamp.field to be consistent with >>>>>> other value.fields and key.fields. Actually that was also my initial >>>>>> proposal in a first draft I prepared. I changed it afterwards to >>>>>> shorten >>>>>> the key. >>>>>> >>>>>> Best, >>>>>> >>>>>> Dawid >>>>>> >>>>>> On 03/03/2020 09:00, Danny Chan wrote: >>>>>>> Thanks Dawid for bringing up this discussion, I think it is a useful >>>>>> feature ~ >>>>>>> >>>>>>> About how the metadata outputs from source >>>>>>> >>>>>>> I think it is completely orthogonal, computed column push down is >>>>>> another topic, this should not be a blocker but a promotion, if we do >>>>> not >>>>>> have any filters on the computed column, there is no need to do any >>>>>> pushings; the source node just emit the complete record with full >>>>> metadata >>>>>> with the declared physical schema, then when generating the virtual >>>>>> columns, we would extract the metadata info and output as full >>>>> columns(with >>>>>> full schema). >>>>>>> >>>>>>> About the type of metadata column >>>>>>> >>>>>>> Personally i prefer explicit type instead of CAST, they are symantic >>>>>> equivalent though, explict type is more straight-forward and we can >>>>> declare >>>>>> the nullable attribute there. >>>>>>> >>>>>>> About option A: partitioning based on acomputed column VS option B: >>>>>> partitioning with just a function >>>>>>> >>>>>>> From the FLIP, it seems that B's partitioning is just a strategy >>>>>>> when >>>>>> writing data, the partiton column is not included in the table >>>>>> schema, >>>>> so >>>>>> it's just useless when reading from that. >>>>>>> >>>>>>> - Compared to A, we do not need to generate the partition column >>>>>>> when >>>>>> selecting from the table(but insert into) >>>>>>> - For A we can also mark the column as STORED when we want to >>>>>>> persist >>>>>> that >>>>>>> >>>>>>> So in my opition they are orthogonal, we can support both, i saw >>>>>>> that >>>>>> MySQL/Oracle[1][2] would suggest to also define the PARTITIONS >>>>>> num, and >>>>> the >>>>>> partitions are managed under a "tablenamespace", the partition in >>>>>> which >>>>> the >>>>>> record is stored is partition number N, where N = MOD(expr, num), for >>>>> your >>>>>> design, which partiton the record would persist ? >>>>>>> >>>>>>> [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html >>>>>>> [2] >>>>>> >>>>> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 >>>>> >>>>>>> >>>>>>> Best, >>>>>>> Danny Chan >>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz >>>>>>> <[hidden email] >>>>>> ,写道: >>>>>>>> Hi Jark, >>>>>>>> Ad. 2 I added a section to discuss relation to FLIP-63 >>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. >>>>>> Therefore you have the key.format.type. >>>>>>>> I also considered exactly what you are suggesting (prefixing with >>>>>> connector or kafka). I should've put that into an Option/Rejected >>>>>> alternatives. >>>>>>>> I agree timestamp, key.*, value.* are connector properties. Why I >>>>>> wanted to suggest not adding that prefix in the first version is that >>>>>> actually all the properties in the WITH section are connector >>>>> properties. >>>>>> Even format is in the end a connector property as some of the sources >>>>> might >>>>>> not have a format, imo. The benefit of not adding the prefix is >>>>>> that it >>>>>> makes the keys a bit shorter. Imagine prefixing all the properties >>>>>> with >>>>>> connector (or if we go with FLINK-12557: elasticsearch): >>>>>>>> elasticsearch.key.format.type: csv >>>>>>>> elasticsearch.key.format.field: .... >>>>>>>> elasticsearch.key.format.delimiter: .... >>>>>>>> elasticsearch.key.format.*: .... >>>>>>>> I am fine with doing it though if this is a preferred approach >>>>>>>> in the >>>>>> community. >>>>>>>> Ad in-line comments: >>>>>>>> I forgot to update the `value.fields.include` property. It >>>>>>>> should be >>>>>> value.fields-include. Which I think you also suggested in the >>>>>> comment, >>>>>> right? >>>>>>>> As for the cast vs declaring output type of computed column. I >>>>>>>> think >>>>>> it's better not to use CAST, but declare a type of an expression and >>>>> later >>>>>> on infer the output type of SYSTEM_METADATA. The reason is I think >>>>>> this >>>>> way >>>>>> it will be easier to implement e.g. filter push downs when working >>>>>> with >>>>> the >>>>>> native types of the source, e.g. in case of Kafka's offset, i >>>>>> think it's >>>>>> better to pushdown long rather than string. This could let us push >>>>>> expression like e.g. offset > 12345 & offset < 59382. Otherwise we >>>>>> would >>>>>> have to push down cast(offset, long) > 12345 && cast(offset, long) < >>>>> 59382. >>>>>> Moreover I think we need to introduce the type for computed columns >>>>> anyway >>>>>> to support functions that infer output type based on expected return >>>>> type. >>>>>>>> As for the computed column push down. Yes, SYSTEM_METADATA would >>>>>>>> have >>>>>> to be pushed down to the source. If it is not possible the planner >>>>> should >>>>>> fail. As far as I know computed columns push down will be part of >>>>>> source >>>>>> rework, won't it? ;) >>>>>>>> As for the persisted computed column. I think it is completely >>>>>> orthogonal. In my current proposal you can also partition by a >>>>>> computed >>>>>> column. The difference between using a udf in partitioned by vs >>>>> partitioned >>>>>> by a computed column is that when you partition by a computed column >>>>> this >>>>>> column must be also computed when reading the table. If you use a >>>>>> udf in >>>>>> the partitioned by, the expression is computed only when inserting >>>>>> into >>>>> the >>>>>> table. >>>>>>>> Hope this answers some of your questions. Looking forward for >>>>>>>> further >>>>>> suggestions. >>>>>>>> Best, >>>>>>>> Dawid >>>>>>>> >>>>>>>> >>>>>>>> On 02/03/2020 05:18, Jark Wu wrote: >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Thanks Dawid for starting such a great discussion. Reaing metadata >>>>> and >>>>>>>>> key-part information from source is an important feature for >>>>> streaming >>>>>>>>> users. >>>>>>>>> >>>>>>>>> In general, I agree with the proposal of the FLIP. >>>>>>>>> I will leave my thoughts and comments here: >>>>>>>>> >>>>>>>>> 1) +1 to use connector properties instead of introducing HEADER >>>>>> keyword as >>>>>>>>> the reason you mentioned in the FLIP. >>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we >>>>>>>>> should >>>>>> add a >>>>>>>>> section to explain what's the relationship between them. >>>>>>>>> Do their concepts conflict? Could INSERT PARTITION be used >>>>>>>>> on the >>>>>>>>> PARTITIONED table in this FLIP? >>>>>>>>> 3) Currently, properties are hierarchical in Flink SQL. Shall we >>>>> make >>>>>> the >>>>>>>>> new introduced properties more hierarchical? >>>>>>>>> For example, "timestamp" => "connector.timestamp"? >>>>>>>>> (actually, I >>>>>> prefer >>>>>>>>> "kafka.timestamp" which is another improvement for properties >>>>>> FLINK-12557) >>>>>>>>> A single "timestamp" in properties may mislead users that the >>>>> field >>>>>> is >>>>>>>>> a rowtime attribute. >>>>>>>>> >>>>>>>>> I also left some minor comments in the FLIP. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Jark >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < >>>>> [hidden email]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I would like to propose an improvement that would enable reading >>>>> table >>>>>>>>>> columns from different parts of source records. Besides the main >>>>>> payload >>>>>>>>>> majority (if not all of the sources) expose additional >>>>> information. It >>>>>>>>>> can be simply a read-only metadata such as offset, ingestion time >>>>> or a >>>>>>>>>> read and write parts of the record that contain data but >>>>> additionally >>>>>>>>>> serve different purposes (partitioning, compaction etc.), e.g. >>>>>>>>>> key >>>>> or >>>>>>>>>> timestamp in Kafka. >>>>>>>>>> >>>>>>>>>> We should make it possible to read and write data from all of >>>>>>>>>> those >>>>>>>>>> locations. In this proposal I discuss reading partitioning data, >>>>> for >>>>>>>>>> completeness this proposal discusses also the partitioning when >>>>>> writing >>>>>>>>>> data out. >>>>>>>>>> >>>>>>>>>> I am looking forward to your comments. >>>>>>>>>> >>>>>>>>>> You can access the FLIP here: >>>>>>>>>> >>>>>>>>>> >>>>>> >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode >>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> >>>>>>>>>> Dawid >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> > |
Hi Timo,
Thanks a lot for picking up this FLIP. I believe it's a very important one for almost everyone who uses Flink SQL with Kafka. Also +1 to leave out partitioning for now. Best, Konstantin On Fri, Sep 4, 2020 at 1:37 PM Aljoscha Krettek <[hidden email]> wrote: > I like the proposal! I didn't check the implementation section in detail > but the SQL DDL examples look good as well as the options for specifying > how fields are mapped to keys/values look good. > > Aljoscha > > On 04.09.20 11:47, Dawid Wysakowicz wrote: > > Hi Timo, > > > > Thank you very much for the update. It indeed covers the full story in > > more details. I agree with the proposal. > > > > On 04/09/2020 10:48, Timo Walther wrote: > >> Hi everyone, > >> > >> I completely reworked FLIP-107. It now covers the full story how to > >> read and write metadata from/to connectors and formats. It considers > >> all of the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It > >> introduces the concept of PERSISTED computed columns and leaves out > >> partitioning for now. > >> > >> Looking forward to your feedback. > >> > >> Regards, > >> Timo > >> > >> > >> On 04.03.20 09:45, Kurt Young wrote: > >>> Sorry, forgot one question. > >>> > >>> 4. Can we make the value.fields-include more orthogonal? Like one can > >>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". > >>> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not > >>> config to > >>> just ignore timestamp but keep key. > >>> > >>> Best, > >>> Kurt > >>> > >>> > >>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <[hidden email]> wrote: > >>> > >>>> Hi Dawid, > >>>> > >>>> I have a couple of questions around key fields, actually I also have > >>>> some > >>>> other questions but want to be focused on key fields first. > >>>> > >>>> 1. I don't fully understand the usage of "key.fields". Is this > >>>> option only > >>>> valid during write operation? Because for > >>>> reading, I can't imagine how such options can be applied. I would > >>>> expect > >>>> that there might be a SYSTEM_METADATA("key") > >>>> to read and assign the key to a normal field? > >>>> > >>>> 2. If "key.fields" is only valid in write operation, I want to > >>>> propose we > >>>> can simplify the options to not introducing key.format.type and > >>>> other related options. I think a single "key.field" (not fields) > >>>> would be > >>>> enough, users can use UDF to calculate whatever key they > >>>> want before sink. > >>>> > >>>> 3. Also I don't want to introduce "value.format.type" and > >>>> "value.format.xxx" with the "value" prefix. Not every connector has a > >>>> concept > >>>> of key and values. The old parameter "format.type" already good > >>>> enough to > >>>> use. > >>>> > >>>> Best, > >>>> Kurt > >>>> > >>>> > >>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <[hidden email]> wrote: > >>>> > >>>>> Thanks Dawid, > >>>>> > >>>>> I have two more questions. > >>>>> > >>>>>> SupportsMetadata > >>>>> Introducing SupportsMetadata sounds good to me. But I have some > >>>>> questions > >>>>> regarding to this interface. > >>>>> 1) How do the source know what the expected return type of each > >>>>> metadata? > >>>>> 2) Where to put the metadata fields? Append to the existing physical > >>>>> fields? > >>>>> If yes, I would suggest to change the signature to `TableSource > >>>>> appendMetadataFields(String[] metadataNames, DataType[] > >>>>> metadataTypes)` > >>>>> > >>>>>> SYSTEM_METADATA("partition") > >>>>> Can SYSTEM_METADATA() function be used nested in a computed column > >>>>> expression? If yes, how to specify the return type of > SYSTEM_METADATA? > >>>>> > >>>>> Best, > >>>>> Jark > >>>>> > >>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz < > [hidden email]> > >>>>> wrote: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> 1. I thought a bit more on how the source would emit the columns > >>>>>> and I > >>>>>> now see its not exactly the same as regular columns. I see a need to > >>>>>> elaborate a bit more on that in the FLIP as you asked, Jark. > >>>>>> > >>>>>> I do agree mostly with Danny on how we should do that. One > additional > >>>>>> things I would introduce is an > >>>>>> > >>>>>> interface SupportsMetadata { > >>>>>> > >>>>>> boolean supportsMetadata(Set<String> metadataFields); > >>>>>> > >>>>>> TableSource generateMetadataFields(Set<String> metadataFields); > >>>>>> > >>>>>> } > >>>>>> > >>>>>> This way the source would have to declare/emit only the requested > >>>>>> metadata fields. In order not to clash with user defined fields. > When > >>>>>> emitting the metadata field I would prepend the column name with > >>>>>> __system_{property_name}. Therefore when requested > >>>>>> SYSTEM_METADATA("partition") the source would append a field > >>>>>> __system_partition to the schema. This would be never visible to the > >>>>>> user as it would be used only for the subsequent computed columns. > If > >>>>>> that makes sense to you, I will update the FLIP with this > >>>>>> description. > >>>>>> > >>>>>> 2. CAST vs explicit type in computed columns > >>>>>> > >>>>>> Here I agree with Danny. It is also the current state of the > >>>>>> proposal. > >>>>>> > >>>>>> 3. Partitioning on computed column vs function > >>>>>> > >>>>>> Here I also agree with Danny. I also think those are orthogonal. I > >>>>>> would > >>>>>> leave out the STORED computed columns out of the discussion. I > >>>>>> don't see > >>>>>> how do they relate to the partitioning. I already put both of those > >>>>>> cases in the document. We can either partition on a computed > >>>>>> column or > >>>>>> use a udf in a partioned by clause. I am fine with leaving out the > >>>>>> partitioning by udf in the first version if you still have some > >>>>> concerns. > >>>>>> > >>>>>> As for your question Danny. It depends which partitioning strategy > >>>>>> you > >>>>> use. > >>>>>> > >>>>>> For the HASH partitioning strategy I thought it would work as you > >>>>>> explained. It would be N = MOD(expr, num). I am not sure though if > we > >>>>>> should introduce the PARTITIONS clause. Usually Flink does not own > >>>>>> the > >>>>>> data and the partitions are already an intrinsic property of the > >>>>>> underlying source e.g. for kafka we do not create topics, but we > just > >>>>>> describe pre-existing pre-partitioned topic. > >>>>>> > >>>>>> 4. timestamp vs timestamp.field vs connector.field vs ... > >>>>>> > >>>>>> I am fine with changing it to timestamp.field to be consistent with > >>>>>> other value.fields and key.fields. Actually that was also my initial > >>>>>> proposal in a first draft I prepared. I changed it afterwards to > >>>>>> shorten > >>>>>> the key. > >>>>>> > >>>>>> Best, > >>>>>> > >>>>>> Dawid > >>>>>> > >>>>>> On 03/03/2020 09:00, Danny Chan wrote: > >>>>>>> Thanks Dawid for bringing up this discussion, I think it is a > useful > >>>>>> feature ~ > >>>>>>> > >>>>>>> About how the metadata outputs from source > >>>>>>> > >>>>>>> I think it is completely orthogonal, computed column push down is > >>>>>> another topic, this should not be a blocker but a promotion, if we > do > >>>>> not > >>>>>> have any filters on the computed column, there is no need to do any > >>>>>> pushings; the source node just emit the complete record with full > >>>>> metadata > >>>>>> with the declared physical schema, then when generating the virtual > >>>>>> columns, we would extract the metadata info and output as full > >>>>> columns(with > >>>>>> full schema). > >>>>>>> > >>>>>>> About the type of metadata column > >>>>>>> > >>>>>>> Personally i prefer explicit type instead of CAST, they are > symantic > >>>>>> equivalent though, explict type is more straight-forward and we can > >>>>> declare > >>>>>> the nullable attribute there. > >>>>>>> > >>>>>>> About option A: partitioning based on acomputed column VS option B: > >>>>>> partitioning with just a function > >>>>>>> > >>>>>>> From the FLIP, it seems that B's partitioning is just a strategy > >>>>>>> when > >>>>>> writing data, the partiton column is not included in the table > >>>>>> schema, > >>>>> so > >>>>>> it's just useless when reading from that. > >>>>>>> > >>>>>>> - Compared to A, we do not need to generate the partition column > >>>>>>> when > >>>>>> selecting from the table(but insert into) > >>>>>>> - For A we can also mark the column as STORED when we want to > >>>>>>> persist > >>>>>> that > >>>>>>> > >>>>>>> So in my opition they are orthogonal, we can support both, i saw > >>>>>>> that > >>>>>> MySQL/Oracle[1][2] would suggest to also define the PARTITIONS > >>>>>> num, and > >>>>> the > >>>>>> partitions are managed under a "tablenamespace", the partition in > >>>>>> which > >>>>> the > >>>>>> record is stored is partition number N, where N = MOD(expr, num), > for > >>>>> your > >>>>>> design, which partiton the record would persist ? > >>>>>>> > >>>>>>> [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html > >>>>>>> [2] > >>>>>> > >>>>> > https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 > >>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Danny Chan > >>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz > >>>>>>> <[hidden email] > >>>>>> ,写道: > >>>>>>>> Hi Jark, > >>>>>>>> Ad. 2 I added a section to discuss relation to FLIP-63 > >>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. > >>>>>> Therefore you have the key.format.type. > >>>>>>>> I also considered exactly what you are suggesting (prefixing with > >>>>>> connector or kafka). I should've put that into an Option/Rejected > >>>>>> alternatives. > >>>>>>>> I agree timestamp, key.*, value.* are connector properties. Why I > >>>>>> wanted to suggest not adding that prefix in the first version is > that > >>>>>> actually all the properties in the WITH section are connector > >>>>> properties. > >>>>>> Even format is in the end a connector property as some of the > sources > >>>>> might > >>>>>> not have a format, imo. The benefit of not adding the prefix is > >>>>>> that it > >>>>>> makes the keys a bit shorter. Imagine prefixing all the properties > >>>>>> with > >>>>>> connector (or if we go with FLINK-12557: elasticsearch): > >>>>>>>> elasticsearch.key.format.type: csv > >>>>>>>> elasticsearch.key.format.field: .... > >>>>>>>> elasticsearch.key.format.delimiter: .... > >>>>>>>> elasticsearch.key.format.*: .... > >>>>>>>> I am fine with doing it though if this is a preferred approach > >>>>>>>> in the > >>>>>> community. > >>>>>>>> Ad in-line comments: > >>>>>>>> I forgot to update the `value.fields.include` property. It > >>>>>>>> should be > >>>>>> value.fields-include. Which I think you also suggested in the > >>>>>> comment, > >>>>>> right? > >>>>>>>> As for the cast vs declaring output type of computed column. I > >>>>>>>> think > >>>>>> it's better not to use CAST, but declare a type of an expression and > >>>>> later > >>>>>> on infer the output type of SYSTEM_METADATA. The reason is I think > >>>>>> this > >>>>> way > >>>>>> it will be easier to implement e.g. filter push downs when working > >>>>>> with > >>>>> the > >>>>>> native types of the source, e.g. in case of Kafka's offset, i > >>>>>> think it's > >>>>>> better to pushdown long rather than string. This could let us push > >>>>>> expression like e.g. offset > 12345 & offset < 59382. Otherwise we > >>>>>> would > >>>>>> have to push down cast(offset, long) > 12345 && cast(offset, long) < > >>>>> 59382. > >>>>>> Moreover I think we need to introduce the type for computed columns > >>>>> anyway > >>>>>> to support functions that infer output type based on expected return > >>>>> type. > >>>>>>>> As for the computed column push down. Yes, SYSTEM_METADATA would > >>>>>>>> have > >>>>>> to be pushed down to the source. If it is not possible the planner > >>>>> should > >>>>>> fail. As far as I know computed columns push down will be part of > >>>>>> source > >>>>>> rework, won't it? ;) > >>>>>>>> As for the persisted computed column. I think it is completely > >>>>>> orthogonal. In my current proposal you can also partition by a > >>>>>> computed > >>>>>> column. The difference between using a udf in partitioned by vs > >>>>> partitioned > >>>>>> by a computed column is that when you partition by a computed column > >>>>> this > >>>>>> column must be also computed when reading the table. If you use a > >>>>>> udf in > >>>>>> the partitioned by, the expression is computed only when inserting > >>>>>> into > >>>>> the > >>>>>> table. > >>>>>>>> Hope this answers some of your questions. Looking forward for > >>>>>>>> further > >>>>>> suggestions. > >>>>>>>> Best, > >>>>>>>> Dawid > >>>>>>>> > >>>>>>>> > >>>>>>>> On 02/03/2020 05:18, Jark Wu wrote: > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> Thanks Dawid for starting such a great discussion. Reaing > metadata > >>>>> and > >>>>>>>>> key-part information from source is an important feature for > >>>>> streaming > >>>>>>>>> users. > >>>>>>>>> > >>>>>>>>> In general, I agree with the proposal of the FLIP. > >>>>>>>>> I will leave my thoughts and comments here: > >>>>>>>>> > >>>>>>>>> 1) +1 to use connector properties instead of introducing HEADER > >>>>>> keyword as > >>>>>>>>> the reason you mentioned in the FLIP. > >>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we > >>>>>>>>> should > >>>>>> add a > >>>>>>>>> section to explain what's the relationship between them. > >>>>>>>>> Do their concepts conflict? Could INSERT PARTITION be used > >>>>>>>>> on the > >>>>>>>>> PARTITIONED table in this FLIP? > >>>>>>>>> 3) Currently, properties are hierarchical in Flink SQL. Shall we > >>>>> make > >>>>>> the > >>>>>>>>> new introduced properties more hierarchical? > >>>>>>>>> For example, "timestamp" => "connector.timestamp"? > >>>>>>>>> (actually, I > >>>>>> prefer > >>>>>>>>> "kafka.timestamp" which is another improvement for properties > >>>>>> FLINK-12557) > >>>>>>>>> A single "timestamp" in properties may mislead users that > the > >>>>> field > >>>>>> is > >>>>>>>>> a rowtime attribute. > >>>>>>>>> > >>>>>>>>> I also left some minor comments in the FLIP. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Jark > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < > >>>>> [hidden email]> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> > >>>>>>>>>> I would like to propose an improvement that would enable reading > >>>>> table > >>>>>>>>>> columns from different parts of source records. Besides the main > >>>>>> payload > >>>>>>>>>> majority (if not all of the sources) expose additional > >>>>> information. It > >>>>>>>>>> can be simply a read-only metadata such as offset, ingestion > time > >>>>> or a > >>>>>>>>>> read and write parts of the record that contain data but > >>>>> additionally > >>>>>>>>>> serve different purposes (partitioning, compaction etc.), e.g. > >>>>>>>>>> key > >>>>> or > >>>>>>>>>> timestamp in Kafka. > >>>>>>>>>> > >>>>>>>>>> We should make it possible to read and write data from all of > >>>>>>>>>> those > >>>>>>>>>> locations. In this proposal I discuss reading partitioning data, > >>>>> for > >>>>>>>>>> completeness this proposal discusses also the partitioning when > >>>>>> writing > >>>>>>>>>> data out. > >>>>>>>>>> > >>>>>>>>>> I am looking forward to your comments. > >>>>>>>>>> > >>>>>>>>>> You can access the FLIP here: > >>>>>>>>>> > >>>>>>>>>> > >>>>>> > >>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > >>>>> > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> > >>>>>>>>>> Dawid > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > > > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk |
In reply to this post by Aljoscha Krettek-2
Thanks Timo for the update !
I like the proposal in general, I have some question about the doc. (1) About the DDL `CAST(SYSTEM_METADATA("offset") AS INT)`, It looks like we use `CAST` to resolve the nullability of meta column type, Could we use explicit type (i.e. SYSTEM_METADATA("offset”) INT NULL) directly? I think the proposed DDL `CAST(SYSTEM_METADATA() AS INT)` is a little complex from user perspective, it’s more like a query rather than a DDL. (2) The doc gives rich examples about reading/writing source metadata column, but missied expamle of reading/writing format metadata. Do we still use `SYSTEM_METADATA` for format metadata? Given a canal-json format data: ``` { "data": [ { "id": "102", "name": "car battery", "description": "12V car battery", "weight": "5.17" } ], "database": "inventory”, "table": "products", "es": 1589374013000, "ts": 1589374013680, "type": "DELETE" } ``` For the metadata database name and table name, can we read them by SYSTEM_METADATA("database"), SYSTEM_METADATA("table") ? (3) Is there exists same meta column in key meta fields, value meta fields and source meta fields? How we distinguish them If them exits although I did not meet this case yet. Maybe we can skip this question... (4) About Reading and writing from key and value section, we bind that the fields of key part must belong to the fields of value part according to the options 'key.fields' = 'id, name' and 'value.fields-include' = 'ALL', Is this by design? I think the key fields and value fields are independent each other in Kafka. Best Leonard > 在 2020年9月4日,19:37,Aljoscha Krettek <[hidden email]> 写道: > > I like the proposal! I didn't check the implementation section in detail but the SQL DDL examples look good as well as the options for specifying how fields are mapped to keys/values look good. > > Aljoscha > > On 04.09.20 11:47, Dawid Wysakowicz wrote: >> Hi Timo, >> Thank you very much for the update. It indeed covers the full story in >> more details. I agree with the proposal. >> On 04/09/2020 10:48, Timo Walther wrote: >>> Hi everyone, >>> >>> I completely reworked FLIP-107. It now covers the full story how to >>> read and write metadata from/to connectors and formats. It considers >>> all of the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It >>> introduces the concept of PERSISTED computed columns and leaves out >>> partitioning for now. >>> >>> Looking forward to your feedback. >>> >>> Regards, >>> Timo >>> >>> >>> On 04.03.20 09:45, Kurt Young wrote: >>>> Sorry, forgot one question. >>>> >>>> 4. Can we make the value.fields-include more orthogonal? Like one can >>>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". >>>> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can not >>>> config to >>>> just ignore timestamp but keep key. >>>> >>>> Best, >>>> Kurt >>>> >>>> >>>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <[hidden email]> wrote: >>>> >>>>> Hi Dawid, >>>>> >>>>> I have a couple of questions around key fields, actually I also have >>>>> some >>>>> other questions but want to be focused on key fields first. >>>>> >>>>> 1. I don't fully understand the usage of "key.fields". Is this >>>>> option only >>>>> valid during write operation? Because for >>>>> reading, I can't imagine how such options can be applied. I would >>>>> expect >>>>> that there might be a SYSTEM_METADATA("key") >>>>> to read and assign the key to a normal field? >>>>> >>>>> 2. If "key.fields" is only valid in write operation, I want to >>>>> propose we >>>>> can simplify the options to not introducing key.format.type and >>>>> other related options. I think a single "key.field" (not fields) >>>>> would be >>>>> enough, users can use UDF to calculate whatever key they >>>>> want before sink. >>>>> >>>>> 3. Also I don't want to introduce "value.format.type" and >>>>> "value.format.xxx" with the "value" prefix. Not every connector has a >>>>> concept >>>>> of key and values. The old parameter "format.type" already good >>>>> enough to >>>>> use. >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> >>>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <[hidden email]> wrote: >>>>> >>>>>> Thanks Dawid, >>>>>> >>>>>> I have two more questions. >>>>>> >>>>>>> SupportsMetadata >>>>>> Introducing SupportsMetadata sounds good to me. But I have some >>>>>> questions >>>>>> regarding to this interface. >>>>>> 1) How do the source know what the expected return type of each >>>>>> metadata? >>>>>> 2) Where to put the metadata fields? Append to the existing physical >>>>>> fields? >>>>>> If yes, I would suggest to change the signature to `TableSource >>>>>> appendMetadataFields(String[] metadataNames, DataType[] >>>>>> metadataTypes)` >>>>>> >>>>>>> SYSTEM_METADATA("partition") >>>>>> Can SYSTEM_METADATA() function be used nested in a computed column >>>>>> expression? If yes, how to specify the return type of SYSTEM_METADATA? >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <[hidden email]> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> 1. I thought a bit more on how the source would emit the columns >>>>>>> and I >>>>>>> now see its not exactly the same as regular columns. I see a need to >>>>>>> elaborate a bit more on that in the FLIP as you asked, Jark. >>>>>>> >>>>>>> I do agree mostly with Danny on how we should do that. One additional >>>>>>> things I would introduce is an >>>>>>> >>>>>>> interface SupportsMetadata { >>>>>>> >>>>>>> boolean supportsMetadata(Set<String> metadataFields); >>>>>>> >>>>>>> TableSource generateMetadataFields(Set<String> metadataFields); >>>>>>> >>>>>>> } >>>>>>> >>>>>>> This way the source would have to declare/emit only the requested >>>>>>> metadata fields. In order not to clash with user defined fields. When >>>>>>> emitting the metadata field I would prepend the column name with >>>>>>> __system_{property_name}. Therefore when requested >>>>>>> SYSTEM_METADATA("partition") the source would append a field >>>>>>> __system_partition to the schema. This would be never visible to the >>>>>>> user as it would be used only for the subsequent computed columns. If >>>>>>> that makes sense to you, I will update the FLIP with this >>>>>>> description. >>>>>>> >>>>>>> 2. CAST vs explicit type in computed columns >>>>>>> >>>>>>> Here I agree with Danny. It is also the current state of the >>>>>>> proposal. >>>>>>> >>>>>>> 3. Partitioning on computed column vs function >>>>>>> >>>>>>> Here I also agree with Danny. I also think those are orthogonal. I >>>>>>> would >>>>>>> leave out the STORED computed columns out of the discussion. I >>>>>>> don't see >>>>>>> how do they relate to the partitioning. I already put both of those >>>>>>> cases in the document. We can either partition on a computed >>>>>>> column or >>>>>>> use a udf in a partioned by clause. I am fine with leaving out the >>>>>>> partitioning by udf in the first version if you still have some >>>>>> concerns. >>>>>>> >>>>>>> As for your question Danny. It depends which partitioning strategy >>>>>>> you >>>>>> use. >>>>>>> >>>>>>> For the HASH partitioning strategy I thought it would work as you >>>>>>> explained. It would be N = MOD(expr, num). I am not sure though if we >>>>>>> should introduce the PARTITIONS clause. Usually Flink does not own >>>>>>> the >>>>>>> data and the partitions are already an intrinsic property of the >>>>>>> underlying source e.g. for kafka we do not create topics, but we just >>>>>>> describe pre-existing pre-partitioned topic. >>>>>>> >>>>>>> 4. timestamp vs timestamp.field vs connector.field vs ... >>>>>>> >>>>>>> I am fine with changing it to timestamp.field to be consistent with >>>>>>> other value.fields and key.fields. Actually that was also my initial >>>>>>> proposal in a first draft I prepared. I changed it afterwards to >>>>>>> shorten >>>>>>> the key. >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Dawid >>>>>>> >>>>>>> On 03/03/2020 09:00, Danny Chan wrote: >>>>>>>> Thanks Dawid for bringing up this discussion, I think it is a useful >>>>>>> feature ~ >>>>>>>> >>>>>>>> About how the metadata outputs from source >>>>>>>> >>>>>>>> I think it is completely orthogonal, computed column push down is >>>>>>> another topic, this should not be a blocker but a promotion, if we do >>>>>> not >>>>>>> have any filters on the computed column, there is no need to do any >>>>>>> pushings; the source node just emit the complete record with full >>>>>> metadata >>>>>>> with the declared physical schema, then when generating the virtual >>>>>>> columns, we would extract the metadata info and output as full >>>>>> columns(with >>>>>>> full schema). >>>>>>>> >>>>>>>> About the type of metadata column >>>>>>>> >>>>>>>> Personally i prefer explicit type instead of CAST, they are symantic >>>>>>> equivalent though, explict type is more straight-forward and we can >>>>>> declare >>>>>>> the nullable attribute there. >>>>>>>> >>>>>>>> About option A: partitioning based on acomputed column VS option B: >>>>>>> partitioning with just a function >>>>>>>> >>>>>>>> From the FLIP, it seems that B's partitioning is just a strategy >>>>>>>> when >>>>>>> writing data, the partiton column is not included in the table >>>>>>> schema, >>>>>> so >>>>>>> it's just useless when reading from that. >>>>>>>> >>>>>>>> - Compared to A, we do not need to generate the partition column >>>>>>>> when >>>>>>> selecting from the table(but insert into) >>>>>>>> - For A we can also mark the column as STORED when we want to >>>>>>>> persist >>>>>>> that >>>>>>>> >>>>>>>> So in my opition they are orthogonal, we can support both, i saw >>>>>>>> that >>>>>>> MySQL/Oracle[1][2] would suggest to also define the PARTITIONS >>>>>>> num, and >>>>>> the >>>>>>> partitions are managed under a "tablenamespace", the partition in >>>>>>> which >>>>>> the >>>>>>> record is stored is partition number N, where N = MOD(expr, num), for >>>>>> your >>>>>>> design, which partiton the record would persist ? >>>>>>>> >>>>>>>> [1] https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html >>>>>>>> [2] >>>>>>> >>>>>> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 >>>>>> >>>>>>>> >>>>>>>> Best, >>>>>>>> Danny Chan >>>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz >>>>>>>> <[hidden email] >>>>>>> ,写道: >>>>>>>>> Hi Jark, >>>>>>>>> Ad. 2 I added a section to discuss relation to FLIP-63 >>>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of properties. >>>>>>> Therefore you have the key.format.type. >>>>>>>>> I also considered exactly what you are suggesting (prefixing with >>>>>>> connector or kafka). I should've put that into an Option/Rejected >>>>>>> alternatives. >>>>>>>>> I agree timestamp, key.*, value.* are connector properties. Why I >>>>>>> wanted to suggest not adding that prefix in the first version is that >>>>>>> actually all the properties in the WITH section are connector >>>>>> properties. >>>>>>> Even format is in the end a connector property as some of the sources >>>>>> might >>>>>>> not have a format, imo. The benefit of not adding the prefix is >>>>>>> that it >>>>>>> makes the keys a bit shorter. Imagine prefixing all the properties >>>>>>> with >>>>>>> connector (or if we go with FLINK-12557: elasticsearch): >>>>>>>>> elasticsearch.key.format.type: csv >>>>>>>>> elasticsearch.key.format.field: .... >>>>>>>>> elasticsearch.key.format.delimiter: .... >>>>>>>>> elasticsearch.key.format.*: .... >>>>>>>>> I am fine with doing it though if this is a preferred approach >>>>>>>>> in the >>>>>>> community. >>>>>>>>> Ad in-line comments: >>>>>>>>> I forgot to update the `value.fields.include` property. It >>>>>>>>> should be >>>>>>> value.fields-include. Which I think you also suggested in the >>>>>>> comment, >>>>>>> right? >>>>>>>>> As for the cast vs declaring output type of computed column. I >>>>>>>>> think >>>>>>> it's better not to use CAST, but declare a type of an expression and >>>>>> later >>>>>>> on infer the output type of SYSTEM_METADATA. The reason is I think >>>>>>> this >>>>>> way >>>>>>> it will be easier to implement e.g. filter push downs when working >>>>>>> with >>>>>> the >>>>>>> native types of the source, e.g. in case of Kafka's offset, i >>>>>>> think it's >>>>>>> better to pushdown long rather than string. This could let us push >>>>>>> expression like e.g. offset > 12345 & offset < 59382. Otherwise we >>>>>>> would >>>>>>> have to push down cast(offset, long) > 12345 && cast(offset, long) < >>>>>> 59382. >>>>>>> Moreover I think we need to introduce the type for computed columns >>>>>> anyway >>>>>>> to support functions that infer output type based on expected return >>>>>> type. >>>>>>>>> As for the computed column push down. Yes, SYSTEM_METADATA would >>>>>>>>> have >>>>>>> to be pushed down to the source. If it is not possible the planner >>>>>> should >>>>>>> fail. As far as I know computed columns push down will be part of >>>>>>> source >>>>>>> rework, won't it? ;) >>>>>>>>> As for the persisted computed column. I think it is completely >>>>>>> orthogonal. In my current proposal you can also partition by a >>>>>>> computed >>>>>>> column. The difference between using a udf in partitioned by vs >>>>>> partitioned >>>>>>> by a computed column is that when you partition by a computed column >>>>>> this >>>>>>> column must be also computed when reading the table. If you use a >>>>>>> udf in >>>>>>> the partitioned by, the expression is computed only when inserting >>>>>>> into >>>>>> the >>>>>>> table. >>>>>>>>> Hope this answers some of your questions. Looking forward for >>>>>>>>> further >>>>>>> suggestions. >>>>>>>>> Best, >>>>>>>>> Dawid >>>>>>>>> >>>>>>>>> >>>>>>>>> On 02/03/2020 05:18, Jark Wu wrote: >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Thanks Dawid for starting such a great discussion. Reaing metadata >>>>>> and >>>>>>>>>> key-part information from source is an important feature for >>>>>> streaming >>>>>>>>>> users. >>>>>>>>>> >>>>>>>>>> In general, I agree with the proposal of the FLIP. >>>>>>>>>> I will leave my thoughts and comments here: >>>>>>>>>> >>>>>>>>>> 1) +1 to use connector properties instead of introducing HEADER >>>>>>> keyword as >>>>>>>>>> the reason you mentioned in the FLIP. >>>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. Maybe we >>>>>>>>>> should >>>>>>> add a >>>>>>>>>> section to explain what's the relationship between them. >>>>>>>>>> Do their concepts conflict? Could INSERT PARTITION be used >>>>>>>>>> on the >>>>>>>>>> PARTITIONED table in this FLIP? >>>>>>>>>> 3) Currently, properties are hierarchical in Flink SQL. Shall we >>>>>> make >>>>>>> the >>>>>>>>>> new introduced properties more hierarchical? >>>>>>>>>> For example, "timestamp" => "connector.timestamp"? >>>>>>>>>> (actually, I >>>>>>> prefer >>>>>>>>>> "kafka.timestamp" which is another improvement for properties >>>>>>> FLINK-12557) >>>>>>>>>> A single "timestamp" in properties may mislead users that the >>>>>> field >>>>>>> is >>>>>>>>>> a rowtime attribute. >>>>>>>>>> >>>>>>>>>> I also left some minor comments in the FLIP. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Jark >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < >>>>>> [hidden email]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I would like to propose an improvement that would enable reading >>>>>> table >>>>>>>>>>> columns from different parts of source records. Besides the main >>>>>>> payload >>>>>>>>>>> majority (if not all of the sources) expose additional >>>>>> information. It >>>>>>>>>>> can be simply a read-only metadata such as offset, ingestion time >>>>>> or a >>>>>>>>>>> read and write parts of the record that contain data but >>>>>> additionally >>>>>>>>>>> serve different purposes (partitioning, compaction etc.), e.g. >>>>>>>>>>> key >>>>>> or >>>>>>>>>>> timestamp in Kafka. >>>>>>>>>>> >>>>>>>>>>> We should make it possible to read and write data from all of >>>>>>>>>>> those >>>>>>>>>>> locations. In this proposal I discuss reading partitioning data, >>>>>> for >>>>>>>>>>> completeness this proposal discusses also the partitioning when >>>>>>> writing >>>>>>>>>>> data out. >>>>>>>>>>> >>>>>>>>>>> I am looking forward to your comments. >>>>>>>>>>> >>>>>>>>>>> You can access the FLIP here: >>>>>>>>>>> >>>>>>>>>>> >>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode >>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> >>>>>>>>>>> Dawid >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> > |
Ignore my question(4), I’ve found the answer in the doc : 'value.fields-include' = ‘EXCEPT_KEY' (all fields of the schema minus fields of the key)
> 在 2020年9月7日,16:33,Leonard Xu <[hidden email]> 写道: > > (4) About Reading and writing from key and value section, we bind that the fields of key part must belong to the fields of value part according to the options 'key.fields' = 'id, name' and 'value.fields-include' = 'ALL', Is this by design? I think the key fields and value fields are independent each other in Kafka. > |
Hi Leonard,
thanks for your feedback. (1) Actually, I discuss this already in the FLIP. But let me summarize our options again if it was not clear enough in the FLIP: a) CREATE TABLE t (a AS CAST(SYSTEM_METADATA("offset") AS INT)) pro: readable, complex arithmetic possible, more SQL compliant, SQL Server compliant con: long b) CREATE TABLE t (a INT AS SYSTEM_METADATA("offset")) pro: shorter, not SQL nor SQL Server compliant con: requires parser changes, no complex arithmetic like `computeSomeThing(SYSTEM_METADATA("offset"))` possible c) CREATE TABLE t (a AS SYSTEM_METADATA("offset", INT)) pro: shorter, very readable, complex arithmetic possible con: non SQL expression, requires parser changes So I decided for a) with less disadvantages. 2) Yes, a format can expose its metadata through the mentioned interfaces in the FLIP. I added an example to the FLIP. 3) The concept of a key or value format is connector specific. And since the table source/table sinks are responsible for returning the metadata columns. We can allow this in the future due to the flexibility of the design. But I also don't think that we need this case for now. I think we can focus on the value format and ignore metadata from the key. Regards, Timo On 07.09.20 11:03, Leonard Xu wrote: > Ignore my question(4), I’ve found the answer in the doc : 'value.fields-include' = ‘EXCEPT_KEY' (all fields of the schema minus fields of the key) > >> 在 2020年9月7日,16:33,Leonard Xu <[hidden email]> 写道: >> >> (4) About Reading and writing from key and value section, we bind that the fields of key part must belong to the fields of value part according to the options 'key.fields' = 'id, name' and 'value.fields-include' = 'ALL', Is this by design? I think the key fields and value fields are independent each other in Kafka. >> > > |
Thanks Timo,
I think this FLIP is already in great shape! I have following questions: 1. `Map<String, DataType> listReadableMetadata()` only allows one possible DataType for a metadata key. However, users may expect to use different types, e.g. for "timestamp" metadata, users may use it as BIGINT, or TIMESTAMP(6) WITH LOCAL TIME ZONE or TIMESTAMP(3) WITH LOCAL TIME ZONE. Do we force users to use the specific types or can use several types in the CAST? 2. Why does the `DecodingFormat#applyReadableMetadata(List<String> metadataKeys)` don't need the `DataType outputDataType` parameter? 3. I think it would be great if we can list the metadata keys (and readable/writable) we want to expose in the first version. I think they are also important public APIs, like connector options? Best, Jark On Mon, 7 Sep 2020 at 18:28, Timo Walther <[hidden email]> wrote: > Hi Leonard, > > thanks for your feedback. > > (1) Actually, I discuss this already in the FLIP. But let me summarize > our options again if it was not clear enough in the FLIP: > > a) CREATE TABLE t (a AS CAST(SYSTEM_METADATA("offset") AS INT)) > pro: readable, complex arithmetic possible, more SQL compliant, SQL > Server compliant > con: long > > b) CREATE TABLE t (a INT AS SYSTEM_METADATA("offset")) > pro: shorter, not SQL nor SQL Server compliant > con: requires parser changes, no complex arithmetic like > `computeSomeThing(SYSTEM_METADATA("offset"))` possible > > c) CREATE TABLE t (a AS SYSTEM_METADATA("offset", INT)) > pro: shorter, very readable, complex arithmetic possible > con: non SQL expression, requires parser changes > > So I decided for a) with less disadvantages. > > 2) Yes, a format can expose its metadata through the mentioned > interfaces in the FLIP. I added an example to the FLIP. > > 3) The concept of a key or value format is connector specific. And since > the table source/table sinks are responsible for returning the metadata > columns. We can allow this in the future due to the flexibility of the > design. But I also don't think that we need this case for now. I think > we can focus on the value format and ignore metadata from the key. > > Regards, > Timo > > > On 07.09.20 11:03, Leonard Xu wrote: > > Ignore my question(4), I’ve found the answer in the doc : > 'value.fields-include' = ‘EXCEPT_KEY' (all fields of the schema minus > fields of the key) > > > >> 在 2020年9月7日,16:33,Leonard Xu <[hidden email]> 写道: > >> > >> (4) About Reading and writing from key and value section, we bind that > the fields of key part must belong to the fields of value part according to > the options 'key.fields' = 'id, name' and 'value.fields-include' = 'ALL', > Is this by design? I think the key fields and value fields are independent > each other in Kafka. > >> > > > > > > |
Sorry, I forgot to ask one more question.
4. Do we allow to use the SYSTEM_METADATA as a sub-expression? For example, checksum AS CAST(CAST(SYSTEM_METADATA("headers") AS MAP<STRING, BYTES>)['checksum'] AS STRING), myvalue AS CAST(CAST(SYSTEM_METADATA("headers") AS MAP<STRING, BYTES>)['mykey'] AS BIGINT) And we will push down only one "headers" metadata, right? Best, Jark On Mon, 7 Sep 2020 at 19:55, Jark Wu <[hidden email]> wrote: > Thanks Timo, > > I think this FLIP is already in great shape! > > I have following questions: > > 1. `Map<String, DataType> listReadableMetadata()` only allows one possible > DataType for a metadata key. > However, users may expect to use different types, e.g. for "timestamp" > metadata, users may use it as BIGINT, or TIMESTAMP(6) WITH LOCAL TIME ZONE > or TIMESTAMP(3) WITH LOCAL TIME ZONE. > Do we force users to use the specific types or can use several types in > the CAST? > > 2. Why does the `DecodingFormat#applyReadableMetadata(List<String> > metadataKeys)` don't need the `DataType outputDataType` parameter? > > 3. I think it would be great if we can list the metadata keys (and > readable/writable) we want to expose in the first version. I think they are > also important public APIs, like connector options? > > Best, > Jark > > On Mon, 7 Sep 2020 at 18:28, Timo Walther <[hidden email]> wrote: > >> Hi Leonard, >> >> thanks for your feedback. >> >> (1) Actually, I discuss this already in the FLIP. But let me summarize >> our options again if it was not clear enough in the FLIP: >> >> a) CREATE TABLE t (a AS CAST(SYSTEM_METADATA("offset") AS INT)) >> pro: readable, complex arithmetic possible, more SQL compliant, SQL >> Server compliant >> con: long >> >> b) CREATE TABLE t (a INT AS SYSTEM_METADATA("offset")) >> pro: shorter, not SQL nor SQL Server compliant >> con: requires parser changes, no complex arithmetic like >> `computeSomeThing(SYSTEM_METADATA("offset"))` possible >> >> c) CREATE TABLE t (a AS SYSTEM_METADATA("offset", INT)) >> pro: shorter, very readable, complex arithmetic possible >> con: non SQL expression, requires parser changes >> >> So I decided for a) with less disadvantages. >> >> 2) Yes, a format can expose its metadata through the mentioned >> interfaces in the FLIP. I added an example to the FLIP. >> >> 3) The concept of a key or value format is connector specific. And since >> the table source/table sinks are responsible for returning the metadata >> columns. We can allow this in the future due to the flexibility of the >> design. But I also don't think that we need this case for now. I think >> we can focus on the value format and ignore metadata from the key. >> >> Regards, >> Timo >> >> >> On 07.09.20 11:03, Leonard Xu wrote: >> > Ignore my question(4), I’ve found the answer in the doc : >> 'value.fields-include' = ‘EXCEPT_KEY' (all fields of the schema minus >> fields of the key) >> > >> >> 在 2020年9月7日,16:33,Leonard Xu <[hidden email]> 写道: >> >> >> >> (4) About Reading and writing from key and value section, we bind that >> the fields of key part must belong to the fields of value part according to >> the options 'key.fields' = 'id, name' and 'value.fields-include' = 'ALL', >> Is this by design? I think the key fields and value fields are independent >> each other in Kafka. >> >> >> > >> > >> >> |
Hi Jark,
1. "`Map<String, DataType> listReadableMetadata()` only allows one possible DataType for a metadata key." I was thinking about this topic a lot today. My conclusion is: yes, we should force users to specify the type as documented. Users can further cast or compute using expressions to more specific types. I decided for BIGINT instead of TIMESTAMP(3) for Kafka timestamps, I think for metadata we should directly forward the underlying atomic type of the external system. And for a Kafka consumer record this is BIGINT without any timezone interpretation. Users can further cast to TIMESTAMP(3) if necessary. I wouldn't introduce too much magic here. What do you think? 2. I don't see a reason why `DecodingFormat#applyReadableMetadata` needs a DataType argument. This argument would need to be created by the source then. Do you have an example in mind? In any case the format could also calculate it later via: producedDataType + metadata columns 3. "list the metadata keys" I went through the list of current connectors and formats. I updated the FLIP for the Kafka and Debezium. For the key design, I used the FLIP-122 naming schema. For HBase, Elasticsearch and others I could not find metadata that might be important for users. 4. "sub-expression" Yes, sub-expression like the ones you mentioned would be allowed. We will push down only one "headers" metadata. Regards, Timo On 07.09.20 14:41, Jark Wu wrote: > Sorry, I forgot to ask one more question. > > 4. Do we allow to use the SYSTEM_METADATA as a sub-expression? For example, > > checksum AS CAST(CAST(SYSTEM_METADATA("headers") AS MAP<STRING, > BYTES>)['checksum'] AS STRING), > myvalue AS CAST(CAST(SYSTEM_METADATA("headers") AS MAP<STRING, > BYTES>)['mykey'] AS BIGINT) > > And we will push down only one "headers" metadata, right? > > Best, > Jark > > > > On Mon, 7 Sep 2020 at 19:55, Jark Wu <[hidden email]> wrote: > >> Thanks Timo, >> >> I think this FLIP is already in great shape! >> >> I have following questions: >> >> 1. `Map<String, DataType> listReadableMetadata()` only allows one possible >> DataType for a metadata key. >> However, users may expect to use different types, e.g. for "timestamp" >> metadata, users may use it as BIGINT, or TIMESTAMP(6) WITH LOCAL TIME ZONE >> or TIMESTAMP(3) WITH LOCAL TIME ZONE. >> Do we force users to use the specific types or can use several types in >> the CAST? >> >> 2. Why does the `DecodingFormat#applyReadableMetadata(List<String> >> metadataKeys)` don't need the `DataType outputDataType` parameter? >> >> 3. I think it would be great if we can list the metadata keys (and >> readable/writable) we want to expose in the first version. I think they are >> also important public APIs, like connector options? >> >> Best, >> Jark >> >> On Mon, 7 Sep 2020 at 18:28, Timo Walther <[hidden email]> wrote: >> >>> Hi Leonard, >>> >>> thanks for your feedback. >>> >>> (1) Actually, I discuss this already in the FLIP. But let me summarize >>> our options again if it was not clear enough in the FLIP: >>> >>> a) CREATE TABLE t (a AS CAST(SYSTEM_METADATA("offset") AS INT)) >>> pro: readable, complex arithmetic possible, more SQL compliant, SQL >>> Server compliant >>> con: long >>> >>> b) CREATE TABLE t (a INT AS SYSTEM_METADATA("offset")) >>> pro: shorter, not SQL nor SQL Server compliant >>> con: requires parser changes, no complex arithmetic like >>> `computeSomeThing(SYSTEM_METADATA("offset"))` possible >>> >>> c) CREATE TABLE t (a AS SYSTEM_METADATA("offset", INT)) >>> pro: shorter, very readable, complex arithmetic possible >>> con: non SQL expression, requires parser changes >>> >>> So I decided for a) with less disadvantages. >>> >>> 2) Yes, a format can expose its metadata through the mentioned >>> interfaces in the FLIP. I added an example to the FLIP. >>> >>> 3) The concept of a key or value format is connector specific. And since >>> the table source/table sinks are responsible for returning the metadata >>> columns. We can allow this in the future due to the flexibility of the >>> design. But I also don't think that we need this case for now. I think >>> we can focus on the value format and ignore metadata from the key. >>> >>> Regards, >>> Timo >>> >>> >>> On 07.09.20 11:03, Leonard Xu wrote: >>>> Ignore my question(4), I’ve found the answer in the doc : >>> 'value.fields-include' = ‘EXCEPT_KEY' (all fields of the schema minus >>> fields of the key) >>>> >>>>> 在 2020年9月7日,16:33,Leonard Xu <[hidden email]> 写道: >>>>> >>>>> (4) About Reading and writing from key and value section, we bind that >>> the fields of key part must belong to the fields of value part according to >>> the options 'key.fields' = 'id, name' and 'value.fields-include' = 'ALL', >>> Is this by design? I think the key fields and value fields are independent >>> each other in Kafka. >>>>> >>>> >>>> >>> >>> > |
Free forum by Nabble | Edit this page |