[DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Zhenghua Gao
Hi all,

FLINK-12254[1] [2] updated TableSink and related interfaces to new type
system which
allows connectors use the new type system based on DataTypes.

But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to
flink-api-java-bridge and returns TypeInformation of the requested record
type which
can't support types with precision and scale, e.g. TIMESTAMP(p),
DECIMAL(p,s).

/**
 * Returns the requested record type.
 */
TypeInformation<T> getRecordType();


A proposal is deprecating the *getRecordType* API and adding a
*getRecordDataType* API instead to return the data type of the requested
record. I have filed the issue FLINK-15469 and
an initial PR to verify it.

What do you think about this API changes? Any feedback are appreciated.
[1] https://issues.apache.org/jira/browse/FLINK-12254
[2] https://github.com/apache/flink/pull/8596
[3] https://issues.apache.org/jira/browse/FLINK-15469

*Best Regards,*
*Zhenghua Gao*
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Kurt Young
Would overriding `getConsumedDataType` do the job?

Best,
Kurt


On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> wrote:

> Hi all,
>
> FLINK-12254[1] [2] updated TableSink and related interfaces to new type
> system which
> allows connectors use the new type system based on DataTypes.
>
> But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to
> flink-api-java-bridge and returns TypeInformation of the requested record
> type which
> can't support types with precision and scale, e.g. TIMESTAMP(p),
> DECIMAL(p,s).
>
> /**
>  * Returns the requested record type.
>  */
> TypeInformation<T> getRecordType();
>
>
> A proposal is deprecating the *getRecordType* API and adding a
> *getRecordDataType* API instead to return the data type of the requested
> record. I have filed the issue FLINK-15469 and
> an initial PR to verify it.
>
> What do you think about this API changes? Any feedback are appreciated.
> [1] https://issues.apache.org/jira/browse/FLINK-12254
> [2] https://github.com/apache/flink/pull/8596
> [3] https://issues.apache.org/jira/browse/FLINK-15469
>
> *Best Regards,*
> *Zhenghua Gao*
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Jingsong Li
Hi Zhenghua,

The *getRecordDataType* looks good to me.

But the main problem is how to represent the tuple type in DataType. I
understand that it is necessary to use StructuredType, but at present,
planner does not support StructuredType, so the other way is to support
StructuredType.

Best,
Jingsong Lee

On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote:

> Would overriding `getConsumedDataType` do the job?
>
> Best,
> Kurt
>
>
> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> wrote:
>
>> Hi all,
>>
>> FLINK-12254[1] [2] updated TableSink and related interfaces to new type
>> system which
>> allows connectors use the new type system based on DataTypes.
>>
>> But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to
>> flink-api-java-bridge and returns TypeInformation of the requested record
>> type which
>> can't support types with precision and scale, e.g. TIMESTAMP(p),
>> DECIMAL(p,s).
>>
>> /**
>>  * Returns the requested record type.
>>  */
>> TypeInformation<T> getRecordType();
>>
>>
>> A proposal is deprecating the *getRecordType* API and adding a
>> *getRecordDataType* API instead to return the data type of the requested
>> record. I have filed the issue FLINK-15469 and
>> an initial PR to verify it.
>>
>> What do you think about this API changes? Any feedback are appreciated.
>> [1] https://issues.apache.org/jira/browse/FLINK-12254
>> [2] https://github.com/apache/flink/pull/8596
>> [3] https://issues.apache.org/jira/browse/FLINK-15469
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Zhenghua Gao
Hi Jingsong,  For now, only UpsertStreamTableSink and
RetractStreamTableSink consumes JTuple2
So the 'getConsumedDataType' interface is not necessary in validate &
codegen phase.
See
https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52
 and
https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304

What about stay the same to use RAW type?

*Best Regards,*
*Zhenghua Gao*


On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <[hidden email]> wrote:

> Hi Zhenghua,
>
> The *getRecordDataType* looks good to me.
>
> But the main problem is how to represent the tuple type in DataType. I
> understand that it is necessary to use StructuredType, but at present,
> planner does not support StructuredType, so the other way is to support
> StructuredType.
>
> Best,
> Jingsong Lee
>
> On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote:
>
> > Would overriding `getConsumedDataType` do the job?
> >
> > Best,
> > Kurt
> >
> >
> > On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> wrote:
> >
> >> Hi all,
> >>
> >> FLINK-12254[1] [2] updated TableSink and related interfaces to new type
> >> system which
> >> allows connectors use the new type system based on DataTypes.
> >>
> >> But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to
> >> flink-api-java-bridge and returns TypeInformation of the requested
> record
> >> type which
> >> can't support types with precision and scale, e.g. TIMESTAMP(p),
> >> DECIMAL(p,s).
> >>
> >> /**
> >>  * Returns the requested record type.
> >>  */
> >> TypeInformation<T> getRecordType();
> >>
> >>
> >> A proposal is deprecating the *getRecordType* API and adding a
> >> *getRecordDataType* API instead to return the data type of the requested
> >> record. I have filed the issue FLINK-15469 and
> >> an initial PR to verify it.
> >>
> >> What do you think about this API changes? Any feedback are appreciated.
> >> [1] https://issues.apache.org/jira/browse/FLINK-12254
> >> [2] https://github.com/apache/flink/pull/8596
> >> [3] https://issues.apache.org/jira/browse/FLINK-15469
> >>
> >> *Best Regards,*
> >> *Zhenghua Gao*
> >>
> >
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Jark Wu-2
Thanks Zhenghua for starting this discussion.

Currently, all the UpsertStreamTableSinks can't upgrade to the new type
system which affects usability a lot.
I hope we can fix that in 1.11.

I'm find with *getRecordDataType* for a temporary solution.
IIUC, the framework will only recognize getRecordDataType and
ignore getConsumedDataType for UpsertStreamTableSink, is that right?

I guess Timo are planning to design a new source/sink interface which will
also fix this problem, but I'm not sure the timelines. cc @Timo
It would be better if we can have a new and complete interface, because
getRecordDataType is little confused as UpsertStreamTableSink already has
three getXXXType().

Best,
Jark


On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <[hidden email]> wrote:

> Hi Jingsong,  For now, only UpsertStreamTableSink and
> RetractStreamTableSink consumes JTuple2
> So the 'getConsumedDataType' interface is not necessary in validate &
> codegen phase.
> See
>
> https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52
>  and
>
> https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304
>
> What about stay the same to use RAW type?
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <[hidden email]> wrote:
>
> > Hi Zhenghua,
> >
> > The *getRecordDataType* looks good to me.
> >
> > But the main problem is how to represent the tuple type in DataType. I
> > understand that it is necessary to use StructuredType, but at present,
> > planner does not support StructuredType, so the other way is to support
> > StructuredType.
> >
> > Best,
> > Jingsong Lee
> >
> > On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote:
> >
> > > Would overriding `getConsumedDataType` do the job?
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> wrote:
> > >
> > >> Hi all,
> > >>
> > >> FLINK-12254[1] [2] updated TableSink and related interfaces to new
> type
> > >> system which
> > >> allows connectors use the new type system based on DataTypes.
> > >>
> > >> But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink
> to
> > >> flink-api-java-bridge and returns TypeInformation of the requested
> > record
> > >> type which
> > >> can't support types with precision and scale, e.g. TIMESTAMP(p),
> > >> DECIMAL(p,s).
> > >>
> > >> /**
> > >>  * Returns the requested record type.
> > >>  */
> > >> TypeInformation<T> getRecordType();
> > >>
> > >>
> > >> A proposal is deprecating the *getRecordType* API and adding a
> > >> *getRecordDataType* API instead to return the data type of the
> requested
> > >> record. I have filed the issue FLINK-15469 and
> > >> an initial PR to verify it.
> > >>
> > >> What do you think about this API changes? Any feedback are
> appreciated.
> > >> [1] https://issues.apache.org/jira/browse/FLINK-12254
> > >> [2] https://github.com/apache/flink/pull/8596
> > >> [3] https://issues.apache.org/jira/browse/FLINK-15469
> > >>
> > >> *Best Regards,*
> > >> *Zhenghua Gao*
> > >>
> > >
> >
> > --
> > Best, Jingsong Lee
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Zhenghua Gao
In reply to this post by Kurt Young
Should we distinguish *record data type* and *consumed data type*?
Currently the design of UpsertStreamTableSink and RetractStreamTableSink
DO  distinguish them.

In my proposal the framework will ignore *getConsumedDataType*,
so it's ok to use *getConsumedDataType* to do the job if we
don't distinguish *record data type* and *consumed data type*.

*Best Regards,*
*Zhenghua Gao*


On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote:

> Would overriding `getConsumedDataType` do the job?
>
> Best,
> Kurt
>
>
> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]> wrote:
>
> > Hi all,
> >
> > FLINK-12254[1] [2] updated TableSink and related interfaces to new type
> > system which
> > allows connectors use the new type system based on DataTypes.
> >
> > But FLINK-12911 port UpsertStreamTableSink and RetractStreamTableSink to
> > flink-api-java-bridge and returns TypeInformation of the requested record
> > type which
> > can't support types with precision and scale, e.g. TIMESTAMP(p),
> > DECIMAL(p,s).
> >
> > /**
> >  * Returns the requested record type.
> >  */
> > TypeInformation<T> getRecordType();
> >
> >
> > A proposal is deprecating the *getRecordType* API and adding a
> > *getRecordDataType* API instead to return the data type of the requested
> > record. I have filed the issue FLINK-15469 and
> > an initial PR to verify it.
> >
> > What do you think about this API changes? Any feedback are appreciated.
> > [1] https://issues.apache.org/jira/browse/FLINK-12254
> > [2] https://github.com/apache/flink/pull/8596
> > [3] https://issues.apache.org/jira/browse/FLINK-15469
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Zhenghua Gao
In reply to this post by Jark Wu-2
Hi Jark, thanks for your comments.
>>>IIUC, the framework will only recognize getRecordDataType and
>>>ignore getConsumedDataType for UpsertStreamTableSink, is that right?
Your are right.

>>>getRecordDataType is little confused as UpsertStreamTableSink already has
>>>three getXXXType().
the getRecordType and getOutputType is deprecated and mainly for backward
compatibility.

*Best Regards,*
*Zhenghua Gao*


On Mon, Feb 3, 2020 at 10:11 PM Jark Wu <[hidden email]> wrote:

> Thanks Zhenghua for starting this discussion.
>
> Currently, all the UpsertStreamTableSinks can't upgrade to the new type
> system which affects usability a lot.
> I hope we can fix that in 1.11.
>
> I'm find with *getRecordDataType* for a temporary solution.
> IIUC, the framework will only recognize getRecordDataType and
> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
>
> I guess Timo are planning to design a new source/sink interface which will
> also fix this problem, but I'm not sure the timelines. cc @Timo
> It would be better if we can have a new and complete interface, because
> getRecordDataType is little confused as UpsertStreamTableSink already has
> three getXXXType().
>
> Best,
> Jark
>
>
> On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <[hidden email]> wrote:
>
> > Hi Jingsong,  For now, only UpsertStreamTableSink and
> > RetractStreamTableSink consumes JTuple2
> > So the 'getConsumedDataType' interface is not necessary in validate &
> > codegen phase.
> > See
> >
> >
> https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52
> >  and
> >
> >
> https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304
> >
> > What about stay the same to use RAW type?
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <[hidden email]>
> wrote:
> >
> > > Hi Zhenghua,
> > >
> > > The *getRecordDataType* looks good to me.
> > >
> > > But the main problem is how to represent the tuple type in DataType. I
> > > understand that it is necessary to use StructuredType, but at present,
> > > planner does not support StructuredType, so the other way is to support
> > > StructuredType.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote:
> > >
> > > > Would overriding `getConsumedDataType` do the job?
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]>
> wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> FLINK-12254[1] [2] updated TableSink and related interfaces to new
> > type
> > > >> system which
> > > >> allows connectors use the new type system based on DataTypes.
> > > >>
> > > >> But FLINK-12911 port UpsertStreamTableSink and
> RetractStreamTableSink
> > to
> > > >> flink-api-java-bridge and returns TypeInformation of the requested
> > > record
> > > >> type which
> > > >> can't support types with precision and scale, e.g. TIMESTAMP(p),
> > > >> DECIMAL(p,s).
> > > >>
> > > >> /**
> > > >>  * Returns the requested record type.
> > > >>  */
> > > >> TypeInformation<T> getRecordType();
> > > >>
> > > >>
> > > >> A proposal is deprecating the *getRecordType* API and adding a
> > > >> *getRecordDataType* API instead to return the data type of the
> > requested
> > > >> record. I have filed the issue FLINK-15469 and
> > > >> an initial PR to verify it.
> > > >>
> > > >> What do you think about this API changes? Any feedback are
> > appreciated.
> > > >> [1] https://issues.apache.org/jira/browse/FLINK-12254
> > > >> [2] https://github.com/apache/flink/pull/8596
> > > >> [3] https://issues.apache.org/jira/browse/FLINK-15469
> > > >>
> > > >> *Best Regards,*
> > > >> *Zhenghua Gao*
> > > >>
> > > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Timo Walther-2
Hi Zhenghua,

Jark is right. The reason why we haven't updated those interfaces yet is
because we are actually would like to introduce new interfaces. We
should target new interfaces in this release. Even a short-term fix as
you proposed with `getRecordDataType` does actually not help as Jingsong
pointed out because we cannot represent tuples in DataType and are also
not planning to support them natively but only as a structured type in
the future.

In my envisioned design, the new sink interface should just always get a
`ChangeRow` which is never serialized and just a data structure for
communicating between the wrapping sink function and the returned sink
function by the table sink.

Let me sketch a rough design document that I will share with you
shortly. Then we could also discuss alternatives.

Thanks,
Timo


On 04.02.20 04:18, Zhenghua Gao wrote:

> Hi Jark, thanks for your comments.
>>>> IIUC, the framework will only recognize getRecordDataType and
>>>> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
> Your are right.
>
>>>> getRecordDataType is little confused as UpsertStreamTableSink already has
>>>> three getXXXType().
> the getRecordType and getOutputType is deprecated and mainly for backward
> compatibility.
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Feb 3, 2020 at 10:11 PM Jark Wu <[hidden email]> wrote:
>
>> Thanks Zhenghua for starting this discussion.
>>
>> Currently, all the UpsertStreamTableSinks can't upgrade to the new type
>> system which affects usability a lot.
>> I hope we can fix that in 1.11.
>>
>> I'm find with *getRecordDataType* for a temporary solution.
>> IIUC, the framework will only recognize getRecordDataType and
>> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
>>
>> I guess Timo are planning to design a new source/sink interface which will
>> also fix this problem, but I'm not sure the timelines. cc @Timo
>> It would be better if we can have a new and complete interface, because
>> getRecordDataType is little confused as UpsertStreamTableSink already has
>> three getXXXType().
>>
>> Best,
>> Jark
>>
>>
>> On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <[hidden email]> wrote:
>>
>>> Hi Jingsong,  For now, only UpsertStreamTableSink and
>>> RetractStreamTableSink consumes JTuple2
>>> So the 'getConsumedDataType' interface is not necessary in validate &
>>> codegen phase.
>>> See
>>>
>>>
>> https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52
>>>   and
>>>
>>>
>> https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304
>>>
>>> What about stay the same to use RAW type?
>>>
>>> *Best Regards,*
>>> *Zhenghua Gao*
>>>
>>>
>>> On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <[hidden email]>
>> wrote:
>>>
>>>> Hi Zhenghua,
>>>>
>>>> The *getRecordDataType* looks good to me.
>>>>
>>>> But the main problem is how to represent the tuple type in DataType. I
>>>> understand that it is necessary to use StructuredType, but at present,
>>>> planner does not support StructuredType, so the other way is to support
>>>> StructuredType.
>>>>
>>>> Best,
>>>> Jingsong Lee
>>>>
>>>> On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote:
>>>>
>>>>> Would overriding `getConsumedDataType` do the job?
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]>
>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> FLINK-12254[1] [2] updated TableSink and related interfaces to new
>>> type
>>>>>> system which
>>>>>> allows connectors use the new type system based on DataTypes.
>>>>>>
>>>>>> But FLINK-12911 port UpsertStreamTableSink and
>> RetractStreamTableSink
>>> to
>>>>>> flink-api-java-bridge and returns TypeInformation of the requested
>>>> record
>>>>>> type which
>>>>>> can't support types with precision and scale, e.g. TIMESTAMP(p),
>>>>>> DECIMAL(p,s).
>>>>>>
>>>>>> /**
>>>>>>   * Returns the requested record type.
>>>>>>   */
>>>>>> TypeInformation<T> getRecordType();
>>>>>>
>>>>>>
>>>>>> A proposal is deprecating the *getRecordType* API and adding a
>>>>>> *getRecordDataType* API instead to return the data type of the
>>> requested
>>>>>> record. I have filed the issue FLINK-15469 and
>>>>>> an initial PR to verify it.
>>>>>>
>>>>>> What do you think about this API changes? Any feedback are
>>> appreciated.
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-12254
>>>>>> [2] https://github.com/apache/flink/pull/8596
>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15469
>>>>>>
>>>>>> *Best Regards,*
>>>>>> *Zhenghua Gao*
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Jark Wu-2
Cool! Looking forward to the design doc.

Best,
Jark

On Fri, 7 Feb 2020 at 17:26, Timo Walther <[hidden email]> wrote:

> Hi Zhenghua,
>
> Jark is right. The reason why we haven't updated those interfaces yet is
> because we are actually would like to introduce new interfaces. We
> should target new interfaces in this release. Even a short-term fix as
> you proposed with `getRecordDataType` does actually not help as Jingsong
> pointed out because we cannot represent tuples in DataType and are also
> not planning to support them natively but only as a structured type in
> the future.
>
> In my envisioned design, the new sink interface should just always get a
> `ChangeRow` which is never serialized and just a data structure for
> communicating between the wrapping sink function and the returned sink
> function by the table sink.
>
> Let me sketch a rough design document that I will share with you
> shortly. Then we could also discuss alternatives.
>
> Thanks,
> Timo
>
>
> On 04.02.20 04:18, Zhenghua Gao wrote:
> > Hi Jark, thanks for your comments.
> >>>> IIUC, the framework will only recognize getRecordDataType and
> >>>> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
> > Your are right.
> >
> >>>> getRecordDataType is little confused as UpsertStreamTableSink already
> has
> >>>> three getXXXType().
> > the getRecordType and getOutputType is deprecated and mainly for backward
> > compatibility.
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Mon, Feb 3, 2020 at 10:11 PM Jark Wu <[hidden email]> wrote:
> >
> >> Thanks Zhenghua for starting this discussion.
> >>
> >> Currently, all the UpsertStreamTableSinks can't upgrade to the new type
> >> system which affects usability a lot.
> >> I hope we can fix that in 1.11.
> >>
> >> I'm find with *getRecordDataType* for a temporary solution.
> >> IIUC, the framework will only recognize getRecordDataType and
> >> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
> >>
> >> I guess Timo are planning to design a new source/sink interface which
> will
> >> also fix this problem, but I'm not sure the timelines. cc @Timo
> >> It would be better if we can have a new and complete interface, because
> >> getRecordDataType is little confused as UpsertStreamTableSink already
> has
> >> three getXXXType().
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <[hidden email]> wrote:
> >>
> >>> Hi Jingsong,  For now, only UpsertStreamTableSink and
> >>> RetractStreamTableSink consumes JTuple2
> >>> So the 'getConsumedDataType' interface is not necessary in validate &
> >>> codegen phase.
> >>> See
> >>>
> >>>
> >>
> https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52
> >>>   and
> >>>
> >>>
> >>
> https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304
> >>>
> >>> What about stay the same to use RAW type?
> >>>
> >>> *Best Regards,*
> >>> *Zhenghua Gao*
> >>>
> >>>
> >>> On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <[hidden email]>
> >> wrote:
> >>>
> >>>> Hi Zhenghua,
> >>>>
> >>>> The *getRecordDataType* looks good to me.
> >>>>
> >>>> But the main problem is how to represent the tuple type in DataType. I
> >>>> understand that it is necessary to use StructuredType, but at present,
> >>>> planner does not support StructuredType, so the other way is to
> support
> >>>> StructuredType.
> >>>>
> >>>> Best,
> >>>> Jingsong Lee
> >>>>
> >>>> On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote:
> >>>>
> >>>>> Would overriding `getConsumedDataType` do the job?
> >>>>>
> >>>>> Best,
> >>>>> Kurt
> >>>>>
> >>>>>
> >>>>> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]>
> >> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> FLINK-12254[1] [2] updated TableSink and related interfaces to new
> >>> type
> >>>>>> system which
> >>>>>> allows connectors use the new type system based on DataTypes.
> >>>>>>
> >>>>>> But FLINK-12911 port UpsertStreamTableSink and
> >> RetractStreamTableSink
> >>> to
> >>>>>> flink-api-java-bridge and returns TypeInformation of the requested
> >>>> record
> >>>>>> type which
> >>>>>> can't support types with precision and scale, e.g. TIMESTAMP(p),
> >>>>>> DECIMAL(p,s).
> >>>>>>
> >>>>>> /**
> >>>>>>   * Returns the requested record type.
> >>>>>>   */
> >>>>>> TypeInformation<T> getRecordType();
> >>>>>>
> >>>>>>
> >>>>>> A proposal is deprecating the *getRecordType* API and adding a
> >>>>>> *getRecordDataType* API instead to return the data type of the
> >>> requested
> >>>>>> record. I have filed the issue FLINK-15469 and
> >>>>>> an initial PR to verify it.
> >>>>>>
> >>>>>> What do you think about this API changes? Any feedback are
> >>> appreciated.
> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-12254
> >>>>>> [2] https://github.com/apache/flink/pull/8596
> >>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15469
> >>>>>>
> >>>>>> *Best Regards,*
> >>>>>> *Zhenghua Gao*
> >>>>>>
> >>>>>
> >>>>
> >>>> --
> >>>> Best, Jingsong Lee
> >>>>
> >>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

Zhenghua Gao
In reply to this post by Timo Walther-2
Thanks Timo! Look forward your design!

*Best Regards,*
*Zhenghua Gao*


On Fri, Feb 7, 2020 at 5:26 PM Timo Walther <[hidden email]> wrote:

> Hi Zhenghua,
>
> Jark is right. The reason why we haven't updated those interfaces yet is
> because we are actually would like to introduce new interfaces. We
> should target new interfaces in this release. Even a short-term fix as
> you proposed with `getRecordDataType` does actually not help as Jingsong
> pointed out because we cannot represent tuples in DataType and are also
> not planning to support them natively but only as a structured type in
> the future.
>
> In my envisioned design, the new sink interface should just always get a
> `ChangeRow` which is never serialized and just a data structure for
> communicating between the wrapping sink function and the returned sink
> function by the table sink.
>
> Let me sketch a rough design document that I will share with you
> shortly. Then we could also discuss alternatives.
>
> Thanks,
> Timo
>
>
> On 04.02.20 04:18, Zhenghua Gao wrote:
> > Hi Jark, thanks for your comments.
> >>>> IIUC, the framework will only recognize getRecordDataType and
> >>>> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
> > Your are right.
> >
> >>>> getRecordDataType is little confused as UpsertStreamTableSink already
> has
> >>>> three getXXXType().
> > the getRecordType and getOutputType is deprecated and mainly for backward
> > compatibility.
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Mon, Feb 3, 2020 at 10:11 PM Jark Wu <[hidden email]> wrote:
> >
> >> Thanks Zhenghua for starting this discussion.
> >>
> >> Currently, all the UpsertStreamTableSinks can't upgrade to the new type
> >> system which affects usability a lot.
> >> I hope we can fix that in 1.11.
> >>
> >> I'm find with *getRecordDataType* for a temporary solution.
> >> IIUC, the framework will only recognize getRecordDataType and
> >> ignore getConsumedDataType for UpsertStreamTableSink, is that right?
> >>
> >> I guess Timo are planning to design a new source/sink interface which
> will
> >> also fix this problem, but I'm not sure the timelines. cc @Timo
> >> It would be better if we can have a new and complete interface, because
> >> getRecordDataType is little confused as UpsertStreamTableSink already
> has
> >> three getXXXType().
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao <[hidden email]> wrote:
> >>
> >>> Hi Jingsong,  For now, only UpsertStreamTableSink and
> >>> RetractStreamTableSink consumes JTuple2
> >>> So the 'getConsumedDataType' interface is not necessary in validate &
> >>> codegen phase.
> >>> See
> >>>
> >>>
> >>
> https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52
> >>>   and
> >>>
> >>>
> >>
> https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304
> >>>
> >>> What about stay the same to use RAW type?
> >>>
> >>> *Best Regards,*
> >>> *Zhenghua Gao*
> >>>
> >>>
> >>> On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li <[hidden email]>
> >> wrote:
> >>>
> >>>> Hi Zhenghua,
> >>>>
> >>>> The *getRecordDataType* looks good to me.
> >>>>
> >>>> But the main problem is how to represent the tuple type in DataType. I
> >>>> understand that it is necessary to use StructuredType, but at present,
> >>>> planner does not support StructuredType, so the other way is to
> support
> >>>> StructuredType.
> >>>>
> >>>> Best,
> >>>> Jingsong Lee
> >>>>
> >>>> On Mon, Feb 3, 2020 at 4:49 PM Kurt Young <[hidden email]> wrote:
> >>>>
> >>>>> Would overriding `getConsumedDataType` do the job?
> >>>>>
> >>>>> Best,
> >>>>> Kurt
> >>>>>
> >>>>>
> >>>>> On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao <[hidden email]>
> >> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> FLINK-12254[1] [2] updated TableSink and related interfaces to new
> >>> type
> >>>>>> system which
> >>>>>> allows connectors use the new type system based on DataTypes.
> >>>>>>
> >>>>>> But FLINK-12911 port UpsertStreamTableSink and
> >> RetractStreamTableSink
> >>> to
> >>>>>> flink-api-java-bridge and returns TypeInformation of the requested
> >>>> record
> >>>>>> type which
> >>>>>> can't support types with precision and scale, e.g. TIMESTAMP(p),
> >>>>>> DECIMAL(p,s).
> >>>>>>
> >>>>>> /**
> >>>>>>   * Returns the requested record type.
> >>>>>>   */
> >>>>>> TypeInformation<T> getRecordType();
> >>>>>>
> >>>>>>
> >>>>>> A proposal is deprecating the *getRecordType* API and adding a
> >>>>>> *getRecordDataType* API instead to return the data type of the
> >>> requested
> >>>>>> record. I have filed the issue FLINK-15469 and
> >>>>>> an initial PR to verify it.
> >>>>>>
> >>>>>> What do you think about this API changes? Any feedback are
> >>> appreciated.
> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-12254
> >>>>>> [2] https://github.com/apache/flink/pull/8596
> >>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15469
> >>>>>>
> >>>>>> *Best Regards,*
> >>>>>> *Zhenghua Gao*
> >>>>>>
> >>>>>
> >>>>
> >>>> --
> >>>> Best, Jingsong Lee
> >>>>
> >>>
> >>
> >
>
>