Hi all,
FLINK-12254[1] [2] updated TableSink and related interfaces to new type system which allows connectors use the new type system based on DataTypes. But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to flink-api-java-bridge and returns TypeInformation of the requested record type which can't support types with precision and scale, e.g. TIMESTAMP(p), DECIMAL(p,s). /** * Returns the requested record type. */ TypeInformation<T> getRecordType(); A proposal is deprecating the *getRecordType* API and adding a *getRecordDataType* API instead to return the data type of the requested record. I have filed the issue FLINK-15469 and an initial PR to verify it. What do you think about this API changes? Any feedback are appreciated. [1] https://issues.apache.org/jira/browse/FLINK-12254 [2] https://github.com/apache/flink/pull/8596 [3] https://issues.apache.org/jira/browse/FLINK-15469 *Best Regards,* *Zhenghua Gao* |
Would overriding `getConsumedDataType` do the job?
Best, Kurt On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> wrote: > Hi all, > > FLINK-12254[1] [2] updated TableSink and related interfaces to new type > system which > allows connectors use the new type system based on DataTypes. > > But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to > flink-api-java-bridge and returns TypeInformation of the requested record > type which > can't support types with precision and scale, e.g. TIMESTAMP(p), > DECIMAL(p,s). > > /** > * Returns the requested record type. > */ > TypeInformation<T> getRecordType(); > > > A proposal is deprecating the *getRecordType* API and adding a > *getRecordDataType* API instead to return the data type of the requested > record. I have filed the issue FLINK-15469 and > an initial PR to verify it. > > What do you think about this API changes? Any feedback are appreciated. > [1] https://issues.apache.org/jira/browse/FLINK-12254 > [2] https://github.com/apache/flink/pull/8596 > [3] https://issues.apache.org/jira/browse/FLINK-15469 > > *Best Regards,* > *Zhenghua Gao* > |
Hi Zhenghua,
The *getRecordDataType* looks good to me. But the main problem is how to represent the tuple type in DataType. I understand that it is necessary to use StructuredType, but at present, planner does not support StructuredType, so the other way is to support StructuredType. Best, Jingsong Lee On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote: > Would overriding `getConsumedDataType` do the job? > > Best, > Kurt > > > On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> wrote: > >> Hi all, >> >> FLINK-12254[1] [2] updated TableSink and related interfaces to new type >> system which >> allows connectors use the new type system based on DataTypes. >> >> But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to >> flink-api-java-bridge and returns TypeInformation of the requested record >> type which >> can't support types with precision and scale, e.g. TIMESTAMP(p), >> DECIMAL(p,s). >> >> /** >> * Returns the requested record type. >> */ >> TypeInformation<T> getRecordType(); >> >> >> A proposal is deprecating the *getRecordType* API and adding a >> *getRecordDataType* API instead to return the data type of the requested >> record. I have filed the issue FLINK-15469 and >> an initial PR to verify it. >> >> What do you think about this API changes? Any feedback are appreciated. >> [1] https://issues.apache.org/jira/browse/FLINK-12254 >> [2] https://github.com/apache/flink/pull/8596 >> [3] https://issues.apache.org/jira/browse/FLINK-15469 >> >> *Best Regards,* >> *Zhenghua Gao* >> > -- Best, Jingsong Lee |
Hi Jingsong, For now, only UpsertStreamTableSink and
RetractStreamTableSink consumes JTuple2 So the 'getConsumedDataType' interface is not necessary in validate & codegen phase. See https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52 and https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304 What about stay the same to use RAW type? *Best Regards,* *Zhenghua Gao* On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <[hidden email]> wrote: > Hi Zhenghua, > > The *getRecordDataType* looks good to me. > > But the main problem is how to represent the tuple type in DataType. I > understand that it is necessary to use StructuredType, but at present, > planner does not support StructuredType, so the other way is to support > StructuredType. > > Best, > Jingsong Lee > > On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote: > > > Would overriding `getConsumedDataType` do the job? > > > > Best, > > Kurt > > > > > > On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> wrote: > > > >> Hi all, > >> > >> FLINK-12254[1] [2] updated TableSink and related interfaces to new type > >> system which > >> allows connectors use the new type system based on DataTypes. > >> > >> But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to > >> flink-api-java-bridge and returns TypeInformation of the requested > record > >> type which > >> can't support types with precision and scale, e.g. TIMESTAMP(p), > >> DECIMAL(p,s). > >> > >> /** > >> * Returns the requested record type. > >> */ > >> TypeInformation<T> getRecordType(); > >> > >> > >> A proposal is deprecating the *getRecordType* API and adding a > >> *getRecordDataType* API instead to return the data type of the requested > >> record. I have filed the issue FLINK-15469 and > >> an initial PR to verify it. > >> > >> What do you think about this API changes? Any feedback are appreciated. > >> [1] https://issues.apache.org/jira/browse/FLINK-12254 > >> [2] https://github.com/apache/flink/pull/8596 > >> [3] https://issues.apache.org/jira/browse/FLINK-15469 > >> > >> *Best Regards,* > >> *Zhenghua Gao* > >> > > > > -- > Best, Jingsong Lee > |
Thanks Zhenghua for starting this discussion.
Currently, all the UpsertStreamTableSinks can't upgrade to the new type system which affects usability a lot. I hope we can fix that in 1.11. I'm find with *getRecordDataType* for a temporary solution. IIUC, the framework will only recognize getRecordDataType and ignore getConsumedDataType for UpsertStreamTableSink, is that right? I guess Timo are planning to design a new source/sink interface which will also fix this problem, but I'm not sure the timelines. cc @Timo It would be better if we can have a new and complete interface, because getRecordDataType is little confused as UpsertStreamTableSink already has three getXXXType(). Best, Jark On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <[hidden email]> wrote: > Hi Jingsong, For now, only UpsertStreamTableSink and > RetractStreamTableSink consumes JTuple2 > So the 'getConsumedDataType' interface is not necessary in validate & > codegen phase. > See > > https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52 > and > > https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304 > > What about stay the same to use RAW type? > > *Best Regards,* > *Zhenghua Gao* > > > On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <[hidden email]> wrote: > > > Hi Zhenghua, > > > > The *getRecordDataType* looks good to me. > > > > But the main problem is how to represent the tuple type in DataType. I > > understand that it is necessary to use StructuredType, but at present, > > planner does not support StructuredType, so the other way is to support > > StructuredType. > > > > Best, > > Jingsong Lee > > > > On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote: > > > > > Would overriding `getConsumedDataType` do the job? > > > > > > Best, > > > Kurt > > > > > > > > > On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> wrote: > > > > > >> Hi all, > > >> > > >> FLINK-12254[1] [2] updated TableSink and related interfaces to new > type > > >> system which > > >> allows connectors use the new type system based on DataTypes. > > >> > > >> But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink > to > > >> flink-api-java-bridge and returns TypeInformation of the requested > > record > > >> type which > > >> can't support types with precision and scale, e.g. TIMESTAMP(p), > > >> DECIMAL(p,s). > > >> > > >> /** > > >> * Returns the requested record type. > > >> */ > > >> TypeInformation<T> getRecordType(); > > >> > > >> > > >> A proposal is deprecating the *getRecordType* API and adding a > > >> *getRecordDataType* API instead to return the data type of the > requested > > >> record. I have filed the issue FLINK-15469 and > > >> an initial PR to verify it. > > >> > > >> What do you think about this API changes? Any feedback are > appreciated. > > >> [1] https://issues.apache.org/jira/browse/FLINK-12254 > > >> [2] https://github.com/apache/flink/pull/8596 > > >> [3] https://issues.apache.org/jira/browse/FLINK-15469 > > >> > > >> *Best Regards,* > > >> *Zhenghua Gao* > > >> > > > > > > > -- > > Best, Jingsong Lee > > > |
In reply to this post by Kurt Young
Should we distinguish *record data type* and *consumed data type*?
Currently the design of UpsertStreamTableSink and RetractStreamTableSink DO distinguish them. In my proposal the framework will ignore *getConsumedDataType*, so it's ok to use *getConsumedDataType* to do the job if we don't distinguish *record data type* and *consumed data type*. *Best Regards,* *Zhenghua Gao* On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote: > Would overriding `getConsumedDataType` do the job? > > Best, > Kurt > > > On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> wrote: > > > Hi all, > > > > FLINK-12254[1] [2] updated TableSink and related interfaces to new type > > system which > > allows connectors use the new type system based on DataTypes. > > > > But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to > > flink-api-java-bridge and returns TypeInformation of the requested record > > type which > > can't support types with precision and scale, e.g. TIMESTAMP(p), > > DECIMAL(p,s). > > > > /** > > * Returns the requested record type. > > */ > > TypeInformation<T> getRecordType(); > > > > > > A proposal is deprecating the *getRecordType* API and adding a > > *getRecordDataType* API instead to return the data type of the requested > > record. I have filed the issue FLINK-15469 and > > an initial PR to verify it. > > > > What do you think about this API changes? Any feedback are appreciated. > > [1] https://issues.apache.org/jira/browse/FLINK-12254 > > [2] https://github.com/apache/flink/pull/8596 > > [3] https://issues.apache.org/jira/browse/FLINK-15469 > > > > *Best Regards,* > > *Zhenghua Gao* > > > |
In reply to this post by Jark Wu-2
Hi Jark, thanks for your comments.
>>>IIUC, the framework will only recognize getRecordDataType and >>>ignore getConsumedDataType for UpsertStreamTableSink, is that right? Your are right. >>>getRecordDataType is little confused as UpsertStreamTableSink already has >>>three getXXXType(). the getRecordType and getOutputType is deprecated and mainly for backward compatibility. *Best Regards,* *Zhenghua Gao* On Mon, Feb 3, 2020 at 10:11 PM Jark Wu <[hidden email]> wrote: > Thanks Zhenghua for starting this discussion. > > Currently, all the UpsertStreamTableSinks can't upgrade to the new type > system which affects usability a lot. > I hope we can fix that in 1.11. > > I'm find with *getRecordDataType* for a temporary solution. > IIUC, the framework will only recognize getRecordDataType and > ignore getConsumedDataType for UpsertStreamTableSink, is that right? > > I guess Timo are planning to design a new source/sink interface which will > also fix this problem, but I'm not sure the timelines. cc @Timo > It would be better if we can have a new and complete interface, because > getRecordDataType is little confused as UpsertStreamTableSink already has > three getXXXType(). > > Best, > Jark > > > On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <[hidden email]> wrote: > > > Hi Jingsong, For now, only UpsertStreamTableSink and > > RetractStreamTableSink consumes JTuple2 > > So the 'getConsumedDataType' interface is not necessary in validate & > > codegen phase. > > See > > > > > https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52 > > and > > > > > https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304 > > > > What about stay the same to use RAW type? > > > > *Best Regards,* > > *Zhenghua Gao* > > > > > > On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <[hidden email]> > wrote: > > > > > Hi Zhenghua, > > > > > > The *getRecordDataType* looks good to me. > > > > > > But the main problem is how to represent the tuple type in DataType. I > > > understand that it is necessary to use StructuredType, but at present, > > > planner does not support StructuredType, so the other way is to support > > > StructuredType. > > > > > > Best, > > > Jingsong Lee > > > > > > On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote: > > > > > > > Would overriding `getConsumedDataType` do the job? > > > > > > > > Best, > > > > Kurt > > > > > > > > > > > > On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> > wrote: > > > > > > > >> Hi all, > > > >> > > > >> FLINK-12254[1] [2] updated TableSink and related interfaces to new > > type > > > >> system which > > > >> allows connectors use the new type system based on DataTypes. > > > >> > > > >> But FLINK-12911 port UpsertStreamTableSink and > RetractStreamTableSink > > to > > > >> flink-api-java-bridge and returns TypeInformation of the requested > > > record > > > >> type which > > > >> can't support types with precision and scale, e.g. TIMESTAMP(p), > > > >> DECIMAL(p,s). > > > >> > > > >> /** > > > >> * Returns the requested record type. > > > >> */ > > > >> TypeInformation<T> getRecordType(); > > > >> > > > >> > > > >> A proposal is deprecating the *getRecordType* API and adding a > > > >> *getRecordDataType* API instead to return the data type of the > > requested > > > >> record. I have filed the issue FLINK-15469 and > > > >> an initial PR to verify it. > > > >> > > > >> What do you think about this API changes? Any feedback are > > appreciated. > > > >> [1] https://issues.apache.org/jira/browse/FLINK-12254 > > > >> [2] https://github.com/apache/flink/pull/8596 > > > >> [3] https://issues.apache.org/jira/browse/FLINK-15469 > > > >> > > > >> *Best Regards,* > > > >> *Zhenghua Gao* > > > >> > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > |
Hi Zhenghua,
Jark is right. The reason why we haven't updated those interfaces yet is because we are actually would like to introduce new interfaces. We should target new interfaces in this release. Even a short-term fix as you proposed with `getRecordDataType` does actually not help as Jingsong pointed out because we cannot represent tuples in DataType and are also not planning to support them natively but only as a structured type in the future. In my envisioned design, the new sink interface should just always get a `ChangeRow` which is never serialized and just a data structure for communicating between the wrapping sink function and the returned sink function by the table sink. Let me sketch a rough design document that I will share with you shortly. Then we could also discuss alternatives. Thanks, Timo On 04.02.20 04:18, Zhenghua Gao wrote: > Hi Jark, thanks for your comments. >>>> IIUC, the framework will only recognize getRecordDataType and >>>> ignore getConsumedDataType for UpsertStreamTableSink, is that right? > Your are right. > >>>> getRecordDataType is little confused as UpsertStreamTableSink already has >>>> three getXXXType(). > the getRecordType and getOutputType is deprecated and mainly for backward > compatibility. > > *Best Regards,* > *Zhenghua Gao* > > > On Mon, Feb 3, 2020 at 10:11 PM Jark Wu <[hidden email]> wrote: > >> Thanks Zhenghua for starting this discussion. >> >> Currently, all the UpsertStreamTableSinks can't upgrade to the new type >> system which affects usability a lot. >> I hope we can fix that in 1.11. >> >> I'm find with *getRecordDataType* for a temporary solution. >> IIUC, the framework will only recognize getRecordDataType and >> ignore getConsumedDataType for UpsertStreamTableSink, is that right? >> >> I guess Timo are planning to design a new source/sink interface which will >> also fix this problem, but I'm not sure the timelines. cc @Timo >> It would be better if we can have a new and complete interface, because >> getRecordDataType is little confused as UpsertStreamTableSink already has >> three getXXXType(). >> >> Best, >> Jark >> >> >> On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <[hidden email]> wrote: >> >>> Hi Jingsong, For now, only UpsertStreamTableSink and >>> RetractStreamTableSink consumes JTuple2 >>> So the 'getConsumedDataType' interface is not necessary in validate & >>> codegen phase. >>> See >>> >>> >> https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52 >>> and >>> >>> >> https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304 >>> >>> What about stay the same to use RAW type? >>> >>> *Best Regards,* >>> *Zhenghua Gao* >>> >>> >>> On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <[hidden email]> >> wrote: >>> >>>> Hi Zhenghua, >>>> >>>> The *getRecordDataType* looks good to me. >>>> >>>> But the main problem is how to represent the tuple type in DataType. I >>>> understand that it is necessary to use StructuredType, but at present, >>>> planner does not support StructuredType, so the other way is to support >>>> StructuredType. >>>> >>>> Best, >>>> Jingsong Lee >>>> >>>> On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote: >>>> >>>>> Would overriding `getConsumedDataType` do the job? >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> >>>>> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> >> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> FLINK-12254[1] [2] updated TableSink and related interfaces to new >>> type >>>>>> system which >>>>>> allows connectors use the new type system based on DataTypes. >>>>>> >>>>>> But FLINK-12911 port UpsertStreamTableSink and >> RetractStreamTableSink >>> to >>>>>> flink-api-java-bridge and returns TypeInformation of the requested >>>> record >>>>>> type which >>>>>> can't support types with precision and scale, e.g. TIMESTAMP(p), >>>>>> DECIMAL(p,s). >>>>>> >>>>>> /** >>>>>> * Returns the requested record type. >>>>>> */ >>>>>> TypeInformation<T> getRecordType(); >>>>>> >>>>>> >>>>>> A proposal is deprecating the *getRecordType* API and adding a >>>>>> *getRecordDataType* API instead to return the data type of the >>> requested >>>>>> record. I have filed the issue FLINK-15469 and >>>>>> an initial PR to verify it. >>>>>> >>>>>> What do you think about this API changes? Any feedback are >>> appreciated. >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-12254 >>>>>> [2] https://github.com/apache/flink/pull/8596 >>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15469 >>>>>> >>>>>> *Best Regards,* >>>>>> *Zhenghua Gao* >>>>>> >>>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >>> >> > |
Cool! Looking forward to the design doc.
Best, Jark On Fri, 7 Feb 2020 at 17:26, Timo Walther <[hidden email]> wrote: > Hi Zhenghua, > > Jark is right. The reason why we haven't updated those interfaces yet is > because we are actually would like to introduce new interfaces. We > should target new interfaces in this release. Even a short-term fix as > you proposed with `getRecordDataType` does actually not help as Jingsong > pointed out because we cannot represent tuples in DataType and are also > not planning to support them natively but only as a structured type in > the future. > > In my envisioned design, the new sink interface should just always get a > `ChangeRow` which is never serialized and just a data structure for > communicating between the wrapping sink function and the returned sink > function by the table sink. > > Let me sketch a rough design document that I will share with you > shortly. Then we could also discuss alternatives. > > Thanks, > Timo > > > On 04.02.20 04:18, Zhenghua Gao wrote: > > Hi Jark, thanks for your comments. > >>>> IIUC, the framework will only recognize getRecordDataType and > >>>> ignore getConsumedDataType for UpsertStreamTableSink, is that right? > > Your are right. > > > >>>> getRecordDataType is little confused as UpsertStreamTableSink already > has > >>>> three getXXXType(). > > the getRecordType and getOutputType is deprecated and mainly for backward > > compatibility. > > > > *Best Regards,* > > *Zhenghua Gao* > > > > > > On Mon, Feb 3, 2020 at 10:11 PM Jark Wu <[hidden email]> wrote: > > > >> Thanks Zhenghua for starting this discussion. > >> > >> Currently, all the UpsertStreamTableSinks can't upgrade to the new type > >> system which affects usability a lot. > >> I hope we can fix that in 1.11. > >> > >> I'm find with *getRecordDataType* for a temporary solution. > >> IIUC, the framework will only recognize getRecordDataType and > >> ignore getConsumedDataType for UpsertStreamTableSink, is that right? > >> > >> I guess Timo are planning to design a new source/sink interface which > will > >> also fix this problem, but I'm not sure the timelines. cc @Timo > >> It would be better if we can have a new and complete interface, because > >> getRecordDataType is little confused as UpsertStreamTableSink already > has > >> three getXXXType(). > >> > >> Best, > >> Jark > >> > >> > >> On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <[hidden email]> wrote: > >> > >>> Hi Jingsong, For now, only UpsertStreamTableSink and > >>> RetractStreamTableSink consumes JTuple2 > >>> So the 'getConsumedDataType' interface is not necessary in validate & > >>> codegen phase. > >>> See > >>> > >>> > >> > https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52 > >>> and > >>> > >>> > >> > https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304 > >>> > >>> What about stay the same to use RAW type? > >>> > >>> *Best Regards,* > >>> *Zhenghua Gao* > >>> > >>> > >>> On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <[hidden email]> > >> wrote: > >>> > >>>> Hi Zhenghua, > >>>> > >>>> The *getRecordDataType* looks good to me. > >>>> > >>>> But the main problem is how to represent the tuple type in DataType. I > >>>> understand that it is necessary to use StructuredType, but at present, > >>>> planner does not support StructuredType, so the other way is to > support > >>>> StructuredType. > >>>> > >>>> Best, > >>>> Jingsong Lee > >>>> > >>>> On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote: > >>>> > >>>>> Would overriding `getConsumedDataType` do the job? > >>>>> > >>>>> Best, > >>>>> Kurt > >>>>> > >>>>> > >>>>> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> > >> wrote: > >>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> FLINK-12254[1] [2] updated TableSink and related interfaces to new > >>> type > >>>>>> system which > >>>>>> allows connectors use the new type system based on DataTypes. > >>>>>> > >>>>>> But FLINK-12911 port UpsertStreamTableSink and > >> RetractStreamTableSink > >>> to > >>>>>> flink-api-java-bridge and returns TypeInformation of the requested > >>>> record > >>>>>> type which > >>>>>> can't support types with precision and scale, e.g. TIMESTAMP(p), > >>>>>> DECIMAL(p,s). > >>>>>> > >>>>>> /** > >>>>>> * Returns the requested record type. > >>>>>> */ > >>>>>> TypeInformation<T> getRecordType(); > >>>>>> > >>>>>> > >>>>>> A proposal is deprecating the *getRecordType* API and adding a > >>>>>> *getRecordDataType* API instead to return the data type of the > >>> requested > >>>>>> record. I have filed the issue FLINK-15469 and > >>>>>> an initial PR to verify it. > >>>>>> > >>>>>> What do you think about this API changes? Any feedback are > >>> appreciated. > >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-12254 > >>>>>> [2] https://github.com/apache/flink/pull/8596 > >>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15469 > >>>>>> > >>>>>> *Best Regards,* > >>>>>> *Zhenghua Gao* > >>>>>> > >>>>> > >>>> > >>>> -- > >>>> Best, Jingsong Lee > >>>> > >>> > >> > > > > |
In reply to this post by Timo Walther-2
Thanks Timo! Look forward your design!
*Best Regards,* *Zhenghua Gao* On Fri, Feb 7, 2020 at 5:26 PM Timo Walther <[hidden email]> wrote: > Hi Zhenghua, > > Jark is right. The reason why we haven't updated those interfaces yet is > because we are actually would like to introduce new interfaces. We > should target new interfaces in this release. Even a short-term fix as > you proposed with `getRecordDataType` does actually not help as Jingsong > pointed out because we cannot represent tuples in DataType and are also > not planning to support them natively but only as a structured type in > the future. > > In my envisioned design, the new sink interface should just always get a > `ChangeRow` which is never serialized and just a data structure for > communicating between the wrapping sink function and the returned sink > function by the table sink. > > Let me sketch a rough design document that I will share with you > shortly. Then we could also discuss alternatives. > > Thanks, > Timo > > > On 04.02.20 04:18, Zhenghua Gao wrote: > > Hi Jark, thanks for your comments. > >>>> IIUC, the framework will only recognize getRecordDataType and > >>>> ignore getConsumedDataType for UpsertStreamTableSink, is that right? > > Your are right. > > > >>>> getRecordDataType is little confused as UpsertStreamTableSink already > has > >>>> three getXXXType(). > > the getRecordType and getOutputType is deprecated and mainly for backward > > compatibility. > > > > *Best Regards,* > > *Zhenghua Gao* > > > > > > On Mon, Feb 3, 2020 at 10:11 PM Jark Wu <[hidden email]> wrote: > > > >> Thanks Zhenghua for starting this discussion. > >> > >> Currently, all the UpsertStreamTableSinks can't upgrade to the new type > >> system which affects usability a lot. > >> I hope we can fix that in 1.11. > >> > >> I'm find with *getRecordDataType* for a temporary solution. > >> IIUC, the framework will only recognize getRecordDataType and > >> ignore getConsumedDataType for UpsertStreamTableSink, is that right? > >> > >> I guess Timo are planning to design a new source/sink interface which > will > >> also fix this problem, but I'm not sure the timelines. cc @Timo > >> It would be better if we can have a new and complete interface, because > >> getRecordDataType is little confused as UpsertStreamTableSink already > has > >> three getXXXType(). > >> > >> Best, > >> Jark > >> > >> > >> On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <[hidden email]> wrote: > >> > >>> Hi Jingsong, For now, only UpsertStreamTableSink and > >>> RetractStreamTableSink consumes JTuple2 > >>> So the 'getConsumedDataType' interface is not necessary in validate & > >>> codegen phase. > >>> See > >>> > >>> > >> > https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52 > >>> and > >>> > >>> > >> > https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304 > >>> > >>> What about stay the same to use RAW type? > >>> > >>> *Best Regards,* > >>> *Zhenghua Gao* > >>> > >>> > >>> On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <[hidden email]> > >> wrote: > >>> > >>>> Hi Zhenghua, > >>>> > >>>> The *getRecordDataType* looks good to me. > >>>> > >>>> But the main problem is how to represent the tuple type in DataType. I > >>>> understand that it is necessary to use StructuredType, but at present, > >>>> planner does not support StructuredType, so the other way is to > support > >>>> StructuredType. > >>>> > >>>> Best, > >>>> Jingsong Lee > >>>> > >>>> On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote: > >>>> > >>>>> Would overriding `getConsumedDataType` do the job? > >>>>> > >>>>> Best, > >>>>> Kurt > >>>>> > >>>>> > >>>>> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> > >> wrote: > >>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> FLINK-12254[1] [2] updated TableSink and related interfaces to new > >>> type > >>>>>> system which > >>>>>> allows connectors use the new type system based on DataTypes. > >>>>>> > >>>>>> But FLINK-12911 port UpsertStreamTableSink and > >> RetractStreamTableSink > >>> to > >>>>>> flink-api-java-bridge and returns TypeInformation of the requested > >>>> record > >>>>>> type which > >>>>>> can't support types with precision and scale, e.g. TIMESTAMP(p), > >>>>>> DECIMAL(p,s). > >>>>>> > >>>>>> /** > >>>>>> * Returns the requested record type. > >>>>>> */ > >>>>>> TypeInformation<T> getRecordType(); > >>>>>> > >>>>>> > >>>>>> A proposal is deprecating the *getRecordType* API and adding a > >>>>>> *getRecordDataType* API instead to return the data type of the > >>> requested > >>>>>> record. I have filed the issue FLINK-15469 and > >>>>>> an initial PR to verify it. > >>>>>> > >>>>>> What do you think about this API changes? Any feedback are > >>> appreciated. > >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-12254 > >>>>>> [2] https://github.com/apache/flink/pull/8596 > >>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15469 > >>>>>> > >>>>>> *Best Regards,* > >>>>>> *Zhenghua Gao* > >>>>>> > >>>>> > >>>> > >>>> -- > >>>> Best, Jingsong Lee > >>>> > >>> > >> > > > > |
Free forum by Nabble | Edit this page |