[DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

classic Classic list List threaded Threaded
41 messages Options
123
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Danny Chan
Thanks for the summary Timo ~

I want to clarify a little bit, what is the conclusion about the fromChangelogStream and toChangelogStream, should we use this name or we use fromDataStream with an optional ChangelogMode flag ?

Best,
Danny Chan
在 2020年9月8日 +0800 PM8:22,Timo Walther <[hidden email]>,写道:

> Hi Danny,
>
> Your proposed signatures sound good to me.
>
> fromDataStream(dataStream, Schema)
> toDataStream(table, AbstractDataType<?>)
>
> They address all my concerns. The API would not be symmetric anymore,
> but this is fine with me. Others raised concerns about deprecating
> `fromDataStream(dataStream, Expression)`. Are they fine with this as well?
>
> If there are no objections, I would update the FLIP with the methods
> above. Bu let me briefly summarize my thoughts on this again, so that we
> are all on the same page:
> - The biggest discussion point seems the fromInsertStream/toInsertStream.
> - I don’t have a strong opinion on naming, from/toDataStream would be
> fine for me. But:
> - It slightly different type mappings and might break existing pipelines
> silently. This point can be neglected as the differences are only minor.
> - We need a way of declaring the rowtime attribute but without declaring
> all columns again. Reduce manual schema work as much as possible.
> - Both Dawid and I don’t like the current either “position based” or
> “name based” expression logic that looks like a projection but is not.
> - Actually name based expressions are not necessary, since we have
> positions for all new data types.
> - Schema is not suitable to influence the output type for toDataStream.
> It should be DataType.
>
> All items are solved by Danny's suggestion.
>
> Regards,
> Timo
>
>
>
> On 08.09.20 14:04, Danny Chan wrote:
> > Hi, Timo ~
> >
> > "It is not about changelog mode compatibility, it is about the type
> > compatibility.”
> >
> > For fromDataStream(dataStream, Schema), there should not be compatibility problem or data type inconsistency. We know the logical type from Schema and physical type from the dataStream itself.
> >
> > For toDataStream(table, AbstractDataType<?>), we can get the logical type from the table itself
> > and the physical type from the passed data type.
> >
> > If both behavior are deterministic, what's the problem for type compatibility and safety?
> >
> > My concern is that in most of the cases, people use the "insert stream", they do not need to care about
> > the data stream ChangelogMode, so there is no need to distinguish them from the APIs, an optional param is enough. If we introduces 2 new API there, people have to choose between them, and can fromChangelogStream()
> > accept an insert stream ? What is the behavior if fromInsertStream() accepts a changelog stream ?
> >
> >
> > "This means potentially duplicate definition of fields and their data types etc”
> >
> > I agree, because table already has an underlying schema there.
> >
> > Best,
> > Danny Chan
> > 在 2020年9月3日 +0800 PM8:12,Timo Walther <[hidden email]>,写道:
> > > Hi Danny,
> > >
> > > "if ChangelogMode.INSERT is the default, existing pipelines should be
> > > compatible"
> > >
> > > It is not about changelog mode compatibility, it is about the type
> > > compatibility. The renaming to `toInsertStream` is only to have a mean
> > > of dealing with data type inconsistencies that could break existing
> > > pipelines.
> > >
> > > As the FLIP describes, the following new behavior should be implemented:
> > >
> > > - It does this by translating the TypeInformation to DataType.
> > > - This will happen with a new TypeInfoDataTypeConverter that will no
> > > longer produce LegacyTypeInformationType.
> > > - All types from DataStream API should be supported by this converter.
> > > - TupleTypeInfoBase will be translated into a proper RowType or
> > > StructuredType.
> > > - BigDecimals will be converted to DECIMAL(38,18) by default.
> > > - Composite types (tuples, POJOs, rows) will be flattened by default if
> > > they are used as top-level records (similar to the old behavior).
> > > - The order of POJO field's is determined by the DataTypeExtractor and
> > > must not be defined manually anymore.
> > > - GenericTypeInfo is converted to RawType immediately by considering the
> > > current configuration.
> > > - A DataStream that originated from Table API will keep its DataType
> > > information due to ExternalTypeInfo implementing DataTypeQueryable.
> > >
> > > I would feel safer if we do this under a new method name.
> > >
> > > "toDataStream(table, schema.bindTo(DataType))"
> > >
> > > This is what I meant with "integrate the DataType into the Schema class
> > > itself". Yes, we can do that if everybody is fine with it. But why
> > > should a user specify both a schema and a data type? This means
> > > potentially duplicate definition of fields and their data types etc.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 03.09.20 11:31, Danny Chan wrote:
> > > > "It is a more conservative approach to introduce that in a
> > > > new method rather than changing the existing one under the hood and
> > > > potentially break existing pipelines silently”
> > > >
> > > > I like the idea actually, but if ChangelogMode.INSERT is the default, existing pipelines should be compatible. We can see the other kinds of ChangelogMode as an extension.
> > > >
> > > > “for `toDataStream` users need to be
> > > > able to express whether they would prefer Row, POJO or atomic”
> > > >
> > > > I think most of the cases people do not need to convert the stream to a Row or POJO, because the table projection always returns a flatternned internal row, if people did want a POJO there, how about we bind the DataType to the existing schema, like this
> > > >
> > > > toDataStream(table, schema.bindTo(DataType))
> > > >
> > > > Best,
> > > > Danny Chan
> > > > 在 2020年9月3日 +0800 PM3:18,[hidden email],写道:
> > > > >
> > > > > It is a more conservative approach to introduce that in a
> > > > > new method rather than changing the existing one under the hood and
> > > > > potentially break existing pipelines silently
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Jark Wu-2
Hi,

I'm +1 to use the naming of from/toDataStream, rather than
from/toInsertStream. So we don't need to deprecate the existing
`fromDataStream`.

I'm +1 to Danny's proposal: fromDataStream(dataStream, Schema) and
toDataStream(table, AbstractDataType<?>)

I think we can also keep the method `createTemporaryView(path,
DataStream<T>)`.
I don't have a strong opinion on deprecating fromDataStream(datastream,
exprs), but slightly prefer to keep them.

Best,
Jark

On Wed, 9 Sep 2020 at 14:34, Danny Chan <[hidden email]> wrote:

> Thanks for the summary Timo ~
>
> I want to clarify a little bit, what is the conclusion about the
> fromChangelogStream and toChangelogStream, should we use this name or we
> use fromDataStream with an optional ChangelogMode flag ?
>
> Best,
> Danny Chan
> 在 2020年9月8日 +0800 PM8:22,Timo Walther <[hidden email]>,写道:
> > Hi Danny,
> >
> > Your proposed signatures sound good to me.
> >
> > fromDataStream(dataStream, Schema)
> > toDataStream(table, AbstractDataType<?>)
> >
> > They address all my concerns. The API would not be symmetric anymore,
> > but this is fine with me. Others raised concerns about deprecating
> > `fromDataStream(dataStream, Expression)`. Are they fine with this as
> well?
> >
> > If there are no objections, I would update the FLIP with the methods
> > above. Bu let me briefly summarize my thoughts on this again, so that we
> > are all on the same page:
> > - The biggest discussion point seems the fromInsertStream/toInsertStream.
> > - I don’t have a strong opinion on naming, from/toDataStream would be
> > fine for me. But:
> > - It slightly different type mappings and might break existing pipelines
> > silently. This point can be neglected as the differences are only minor.
> > - We need a way of declaring the rowtime attribute but without declaring
> > all columns again. Reduce manual schema work as much as possible.
> > - Both Dawid and I don’t like the current either “position based” or
> > “name based” expression logic that looks like a projection but is not.
> > - Actually name based expressions are not necessary, since we have
> > positions for all new data types.
> > - Schema is not suitable to influence the output type for toDataStream.
> > It should be DataType.
> >
> > All items are solved by Danny's suggestion.
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 08.09.20 14:04, Danny Chan wrote:
> > > Hi, Timo ~
> > >
> > > "It is not about changelog mode compatibility, it is about the type
> > > compatibility.”
> > >
> > > For fromDataStream(dataStream, Schema), there should not be
> compatibility problem or data type inconsistency. We know the logical type
> from Schema and physical type from the dataStream itself.
> > >
> > > For toDataStream(table, AbstractDataType<?>), we can get the logical
> type from the table itself
> > > and the physical type from the passed data type.
> > >
> > > If both behavior are deterministic, what's the problem for type
> compatibility and safety?
> > >
> > > My concern is that in most of the cases, people use the "insert
> stream", they do not need to care about
> > > the data stream ChangelogMode, so there is no need to distinguish them
> from the APIs, an optional param is enough. If we introduces 2 new API
> there, people have to choose between them, and can fromChangelogStream()
> > > accept an insert stream ? What is the behavior if fromInsertStream()
> accepts a changelog stream ?
> > >
> > >
> > > "This means potentially duplicate definition of fields and their data
> types etc”
> > >
> > > I agree, because table already has an underlying schema there.
> > >
> > > Best,
> > > Danny Chan
> > > 在 2020年9月3日 +0800 PM8:12,Timo Walther <[hidden email]>,写道:
> > > > Hi Danny,
> > > >
> > > > "if ChangelogMode.INSERT is the default, existing pipelines should be
> > > > compatible"
> > > >
> > > > It is not about changelog mode compatibility, it is about the type
> > > > compatibility. The renaming to `toInsertStream` is only to have a
> mean
> > > > of dealing with data type inconsistencies that could break existing
> > > > pipelines.
> > > >
> > > > As the FLIP describes, the following new behavior should be
> implemented:
> > > >
> > > > - It does this by translating the TypeInformation to DataType.
> > > > - This will happen with a new TypeInfoDataTypeConverter that will no
> > > > longer produce LegacyTypeInformationType.
> > > > - All types from DataStream API should be supported by this
> converter.
> > > > - TupleTypeInfoBase will be translated into a proper RowType or
> > > > StructuredType.
> > > > - BigDecimals will be converted to DECIMAL(38,18) by default.
> > > > - Composite types (tuples, POJOs, rows) will be flattened by default
> if
> > > > they are used as top-level records (similar to the old behavior).
> > > > - The order of POJO field's is determined by the DataTypeExtractor
> and
> > > > must not be defined manually anymore.
> > > > - GenericTypeInfo is converted to RawType immediately by considering
> the
> > > > current configuration.
> > > > - A DataStream that originated from Table API will keep its DataType
> > > > information due to ExternalTypeInfo implementing DataTypeQueryable.
> > > >
> > > > I would feel safer if we do this under a new method name.
> > > >
> > > > "toDataStream(table, schema.bindTo(DataType))"
> > > >
> > > > This is what I meant with "integrate the DataType into the Schema
> class
> > > > itself". Yes, we can do that if everybody is fine with it. But why
> > > > should a user specify both a schema and a data type? This means
> > > > potentially duplicate definition of fields and their data types etc.
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > >
> > > > On 03.09.20 11:31, Danny Chan wrote:
> > > > > "It is a more conservative approach to introduce that in a
> > > > > new method rather than changing the existing one under the hood and
> > > > > potentially break existing pipelines silently”
> > > > >
> > > > > I like the idea actually, but if ChangelogMode.INSERT is the
> default, existing pipelines should be compatible. We can see the other
> kinds of ChangelogMode as an extension.
> > > > >
> > > > > “for `toDataStream` users need to be
> > > > > able to express whether they would prefer Row, POJO or atomic”
> > > > >
> > > > > I think most of the cases people do not need to convert the stream
> to a Row or POJO, because the table projection always returns a flatternned
> internal row, if people did want a POJO there, how about we bind the
> DataType to the existing schema, like this
> > > > >
> > > > > toDataStream(table, schema.bindTo(DataType))
> > > > >
> > > > > Best,
> > > > > Danny Chan
> > > > > 在 2020年9月3日 +0800 PM3:18,[hidden email],写道:
> > > > > >
> > > > > > It is a more conservative approach to introduce that in a
> > > > > > new method rather than changing the existing one under the hood
> and
> > > > > > potentially break existing pipelines silently
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Timo Walther-2
In reply to this post by Danny Chan
The conclusion is that we will drop `fromChangelogStream(ChangelogMode,
DataStream<Row>)` but will keep `fromChangelogStream(DataStream<Row>)`.
The latter is necessary to have a per-record changeflag.

We could think about merging `fromChangelogStream`/`fromDataStream` and
`toChangelogStream`/`toDataStream`. But I think the planner needs to
know whether the input is insert-only or not. Esp. for bounded streams
this information will be useful in the future. Also, outputting a query
to a non-Row type for retraction queries doesn't make much sense if a
changeflag is missing.

What do you think?

Regard,
Timo

On 09.09.20 08:34, Danny Chan wrote:

> Thanks for the summary Timo ~
>
> I want to clarify a little bit, what is the conclusion about the fromChangelogStream and toChangelogStream, should we use this name or we use fromDataStream with an optional ChangelogMode flag ?
>
> Best,
> Danny Chan
> 在 2020年9月8日 +0800 PM8:22,Timo Walther <[hidden email]>,写道:
>> Hi Danny,
>>
>> Your proposed signatures sound good to me.
>>
>> fromDataStream(dataStream, Schema)
>> toDataStream(table, AbstractDataType<?>)
>>
>> They address all my concerns. The API would not be symmetric anymore,
>> but this is fine with me. Others raised concerns about deprecating
>> `fromDataStream(dataStream, Expression)`. Are they fine with this as well?
>>
>> If there are no objections, I would update the FLIP with the methods
>> above. Bu let me briefly summarize my thoughts on this again, so that we
>> are all on the same page:
>> - The biggest discussion point seems the fromInsertStream/toInsertStream.
>> - I don’t have a strong opinion on naming, from/toDataStream would be
>> fine for me. But:
>> - It slightly different type mappings and might break existing pipelines
>> silently. This point can be neglected as the differences are only minor.
>> - We need a way of declaring the rowtime attribute but without declaring
>> all columns again. Reduce manual schema work as much as possible.
>> - Both Dawid and I don’t like the current either “position based” or
>> “name based” expression logic that looks like a projection but is not.
>> - Actually name based expressions are not necessary, since we have
>> positions for all new data types.
>> - Schema is not suitable to influence the output type for toDataStream.
>> It should be DataType.
>>
>> All items are solved by Danny's suggestion.
>>
>> Regards,
>> Timo
>>
>>
>>
>> On 08.09.20 14:04, Danny Chan wrote:
>>> Hi, Timo ~
>>>
>>> "It is not about changelog mode compatibility, it is about the type
>>> compatibility.”
>>>
>>> For fromDataStream(dataStream, Schema), there should not be compatibility problem or data type inconsistency. We know the logical type from Schema and physical type from the dataStream itself.
>>>
>>> For toDataStream(table, AbstractDataType<?>), we can get the logical type from the table itself
>>> and the physical type from the passed data type.
>>>
>>> If both behavior are deterministic, what's the problem for type compatibility and safety?
>>>
>>> My concern is that in most of the cases, people use the "insert stream", they do not need to care about
>>> the data stream ChangelogMode, so there is no need to distinguish them from the APIs, an optional param is enough. If we introduces 2 new API there, people have to choose between them, and can fromChangelogStream()
>>> accept an insert stream ? What is the behavior if fromInsertStream() accepts a changelog stream ?
>>>
>>>
>>> "This means potentially duplicate definition of fields and their data types etc”
>>>
>>> I agree, because table already has an underlying schema there.
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年9月3日 +0800 PM8:12,Timo Walther <[hidden email]>,写道:
>>>> Hi Danny,
>>>>
>>>> "if ChangelogMode.INSERT is the default, existing pipelines should be
>>>> compatible"
>>>>
>>>> It is not about changelog mode compatibility, it is about the type
>>>> compatibility. The renaming to `toInsertStream` is only to have a mean
>>>> of dealing with data type inconsistencies that could break existing
>>>> pipelines.
>>>>
>>>> As the FLIP describes, the following new behavior should be implemented:
>>>>
>>>> - It does this by translating the TypeInformation to DataType.
>>>> - This will happen with a new TypeInfoDataTypeConverter that will no
>>>> longer produce LegacyTypeInformationType.
>>>> - All types from DataStream API should be supported by this converter.
>>>> - TupleTypeInfoBase will be translated into a proper RowType or
>>>> StructuredType.
>>>> - BigDecimals will be converted to DECIMAL(38,18) by default.
>>>> - Composite types (tuples, POJOs, rows) will be flattened by default if
>>>> they are used as top-level records (similar to the old behavior).
>>>> - The order of POJO field's is determined by the DataTypeExtractor and
>>>> must not be defined manually anymore.
>>>> - GenericTypeInfo is converted to RawType immediately by considering the
>>>> current configuration.
>>>> - A DataStream that originated from Table API will keep its DataType
>>>> information due to ExternalTypeInfo implementing DataTypeQueryable.
>>>>
>>>> I would feel safer if we do this under a new method name.
>>>>
>>>> "toDataStream(table, schema.bindTo(DataType))"
>>>>
>>>> This is what I meant with "integrate the DataType into the Schema class
>>>> itself". Yes, we can do that if everybody is fine with it. But why
>>>> should a user specify both a schema and a data type? This means
>>>> potentially duplicate definition of fields and their data types etc.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 03.09.20 11:31, Danny Chan wrote:
>>>>> "It is a more conservative approach to introduce that in a
>>>>> new method rather than changing the existing one under the hood and
>>>>> potentially break existing pipelines silently”
>>>>>
>>>>> I like the idea actually, but if ChangelogMode.INSERT is the default, existing pipelines should be compatible. We can see the other kinds of ChangelogMode as an extension.
>>>>>
>>>>> “for `toDataStream` users need to be
>>>>> able to express whether they would prefer Row, POJO or atomic”
>>>>>
>>>>> I think most of the cases people do not need to convert the stream to a Row or POJO, because the table projection always returns a flatternned internal row, if people did want a POJO there, how about we bind the DataType to the existing schema, like this
>>>>>
>>>>> toDataStream(table, schema.bindTo(DataType))
>>>>>
>>>>> Best,
>>>>> Danny Chan
>>>>> 在 2020年9月3日 +0800 PM3:18,[hidden email],写道:
>>>>>>
>>>>>> It is a more conservative approach to introduce that in a
>>>>>> new method rather than changing the existing one under the hood and
>>>>>> potentially break existing pipelines silently
>>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Danny Chan
“But I think the planner needs to
know whether the input is insert-only or not.”

Does fromDataStream(dataStream, schema, changelogMode)

solve your concerns ?  People can pass around whatever ChangelogMode they like as an optional param.
By default: fromDataStream(dataStream, schema), the ChangelogMode is INSERT.

Best,
Danny Chan
在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
>
> But I think the planner needs to
> know whether the input is insert-only or not.
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Timo Walther-2
I had this in the inital design, but Jark had concerns at least for the
`toChangelogStream(ChangelogMode)` (see earlier discussion).

`fromDataStream(dataStream, schema, changelogMode)` would be possible.

But in this case I would vote for a symmetric API. If we keep
toChangelogStream we should also have a fromChangelogStream.

And if we unify `toChangelogStream` and `toDataStream`, retractions
cannot be represented for non-Rows and users will experience duplicate
records with a missing changeflag.

Regards,
Timo


On 09.09.20 09:31, Danny Chan wrote:

> “But I think the planner needs to
> know whether the input is insert-only or not.”
>
> Does fromDataStream(dataStream, schema, changelogMode)
>
> solve your concerns ?  People can pass around whatever ChangelogMode they like as an optional param.
> By default: fromDataStream(dataStream, schema), the ChangelogMode is INSERT.
>
> Best,
> Danny Chan
> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
>>
>> But I think the planner needs to
>> know whether the input is insert-only or not.
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Jark Wu-2
I prefer to have separate APIs for them as changelog stream requires Row
type.
It would make the API more straightforward and reduce the confusion.

Best,
Jark

On Wed, 9 Sep 2020 at 16:21, Timo Walther <[hidden email]> wrote:

> I had this in the inital design, but Jark had concerns at least for the
> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>
> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
>
> But in this case I would vote for a symmetric API. If we keep
> toChangelogStream we should also have a fromChangelogStream.
>
> And if we unify `toChangelogStream` and `toDataStream`, retractions
> cannot be represented for non-Rows and users will experience duplicate
> records with a missing changeflag.
>
> Regards,
> Timo
>
>
> On 09.09.20 09:31, Danny Chan wrote:
> > “But I think the planner needs to
> > know whether the input is insert-only or not.”
> >
> > Does fromDataStream(dataStream, schema, changelogMode)
> >
> > solve your concerns ?  People can pass around whatever ChangelogMode
> they like as an optional param.
> > By default: fromDataStream(dataStream, schema), the ChangelogMode is
> INSERT.
> >
> > Best,
> > Danny Chan
> > 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
> >>
> >> But I think the planner needs to
> >> know whether the input is insert-only or not.
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Danny Chan
In reply to this post by Timo Walther-2
I think it would bring in much confusion by a different API name just because the DataStream generic type is different.
If there are ChangelogMode that only works for Row, can we have a type check there ?

Switch to a new API name does not really solve the problem well, people still need to declare the ChangelogMode explicitly, and there are some confusions:

• Should DataStream of Row type always use #fromChangelogStream ?
• Does fromChangelogStream works for only INSERT ChangelogMode ?


Best,
Danny Chan
在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:

> I had this in the inital design, but Jark had concerns at least for the
> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>
> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
>
> But in this case I would vote for a symmetric API. If we keep
> toChangelogStream we should also have a fromChangelogStream.
>
> And if we unify `toChangelogStream` and `toDataStream`, retractions
> cannot be represented for non-Rows and users will experience duplicate
> records with a missing changeflag.
>
> Regards,
> Timo
>
>
> On 09.09.20 09:31, Danny Chan wrote:
> > “But I think the planner needs to
> > know whether the input is insert-only or not.”
> >
> > Does fromDataStream(dataStream, schema, changelogMode)
> >
> > solve your concerns ?  People can pass around whatever ChangelogMode they like as an optional param.
> > By default: fromDataStream(dataStream, schema), the ChangelogMode is INSERT.
> >
> > Best,
> > Danny Chan
> > 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
> > >
> > > But I think the planner needs to
> > > know whether the input is insert-only or not.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Timo Walther-2
I agree with Jark. It reduces confusion.

The DataStream API doesn't know changelog processing at all. A
DataStream of Row can be used with both `fromDataStream` and
`fromChangelogStream`. But only the latter API will interpret it as a
changelog something.

And as I mentioned before, the `toChangelogStream` must work with Row
otherwise users are confused due to duplicate records with a missing
changeflag.

I will update the FLIP-136 a last time. I hope we can then continue to a
vote.

Regards,
Timo


On 09.09.20 10:50, Danny Chan wrote:

> I think it would bring in much confusion by a different API name just because the DataStream generic type is different.
> If there are ChangelogMode that only works for Row, can we have a type check there ?
>
> Switch to a new API name does not really solve the problem well, people still need to declare the ChangelogMode explicitly, and there are some confusions:
>
> • Should DataStream of Row type always use #fromChangelogStream ?
> • Does fromChangelogStream works for only INSERT ChangelogMode ?
>
>
> Best,
> Danny Chan
> 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
>> I had this in the inital design, but Jark had concerns at least for the
>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>
>> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
>>
>> But in this case I would vote for a symmetric API. If we keep
>> toChangelogStream we should also have a fromChangelogStream.
>>
>> And if we unify `toChangelogStream` and `toDataStream`, retractions
>> cannot be represented for non-Rows and users will experience duplicate
>> records with a missing changeflag.
>>
>> Regards,
>> Timo
>>
>>
>> On 09.09.20 09:31, Danny Chan wrote:
>>> “But I think the planner needs to
>>> know whether the input is insert-only or not.”
>>>
>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>
>>> solve your concerns ?  People can pass around whatever ChangelogMode they like as an optional param.
>>> By default: fromDataStream(dataStream, schema), the ChangelogMode is INSERT.
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
>>>>
>>>> But I think the planner needs to
>>>> know whether the input is insert-only or not.
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Danny Chan-2
Thanks, i'm fine with that.


Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:

> I agree with Jark. It reduces confusion.
>
> The DataStream API doesn't know changelog processing at all. A
> DataStream of Row can be used with both `fromDataStream` and
> `fromChangelogStream`. But only the latter API will interpret it as a
> changelog something.
>
> And as I mentioned before, the `toChangelogStream` must work with Row
> otherwise users are confused due to duplicate records with a missing
> changeflag.
>
> I will update the FLIP-136 a last time. I hope we can then continue to a
> vote.
>
> Regards,
> Timo
>
>
> On 09.09.20 10:50, Danny Chan wrote:
> > I think it would bring in much confusion by a different API name just
> because the DataStream generic type is different.
> > If there are ChangelogMode that only works for Row, can we have a type
> check there ?
> >
> > Switch to a new API name does not really solve the problem well, people
> still need to declare the ChangelogMode explicitly, and there are some
> confusions:
> >
> > • Should DataStream of Row type always use #fromChangelogStream ?
> > • Does fromChangelogStream works for only INSERT ChangelogMode ?
> >
> >
> > Best,
> > Danny Chan
> > 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
> >> I had this in the inital design, but Jark had concerns at least for the
> >> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>
> >> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
> >>
> >> But in this case I would vote for a symmetric API. If we keep
> >> toChangelogStream we should also have a fromChangelogStream.
> >>
> >> And if we unify `toChangelogStream` and `toDataStream`, retractions
> >> cannot be represented for non-Rows and users will experience duplicate
> >> records with a missing changeflag.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 09.09.20 09:31, Danny Chan wrote:
> >>> “But I think the planner needs to
> >>> know whether the input is insert-only or not.”
> >>>
> >>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>
> >>> solve your concerns ?  People can pass around whatever ChangelogMode
> they like as an optional param.
> >>> By default: fromDataStream(dataStream, schema), the ChangelogMode is
> INSERT.
> >>>
> >>> Best,
> >>> Danny Chan
> >>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
> >>>>
> >>>> But I think the planner needs to
> >>>> know whether the input is insert-only or not.
> >>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Timo Walther-2
Thanks everyone for this healthy discussion. I updated the FLIP with the
outcome. I think the result is one of the last core API refactoring and
users will be happy to have a consistent changelog support. Thanks for
all the contributions.

If there are no objections, I would continue with a voting.

What do you think?

Regards,
Timo


On 09.09.20 14:31, Danny Chan wrote:

> Thanks, i'm fine with that.
>
>
> Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:
>
>> I agree with Jark. It reduces confusion.
>>
>> The DataStream API doesn't know changelog processing at all. A
>> DataStream of Row can be used with both `fromDataStream` and
>> `fromChangelogStream`. But only the latter API will interpret it as a
>> changelog something.
>>
>> And as I mentioned before, the `toChangelogStream` must work with Row
>> otherwise users are confused due to duplicate records with a missing
>> changeflag.
>>
>> I will update the FLIP-136 a last time. I hope we can then continue to a
>> vote.
>>
>> Regards,
>> Timo
>>
>>
>> On 09.09.20 10:50, Danny Chan wrote:
>>> I think it would bring in much confusion by a different API name just
>> because the DataStream generic type is different.
>>> If there are ChangelogMode that only works for Row, can we have a type
>> check there ?
>>>
>>> Switch to a new API name does not really solve the problem well, people
>> still need to declare the ChangelogMode explicitly, and there are some
>> confusions:
>>>
>>> • Should DataStream of Row type always use #fromChangelogStream ?
>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
>>>
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
>>>> I had this in the inital design, but Jark had concerns at least for the
>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>>>
>>>> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
>>>>
>>>> But in this case I would vote for a symmetric API. If we keep
>>>> toChangelogStream we should also have a fromChangelogStream.
>>>>
>>>> And if we unify `toChangelogStream` and `toDataStream`, retractions
>>>> cannot be represented for non-Rows and users will experience duplicate
>>>> records with a missing changeflag.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 09.09.20 09:31, Danny Chan wrote:
>>>>> “But I think the planner needs to
>>>>> know whether the input is insert-only or not.”
>>>>>
>>>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>>>
>>>>> solve your concerns ?  People can pass around whatever ChangelogMode
>> they like as an optional param.
>>>>> By default: fromDataStream(dataStream, schema), the ChangelogMode is
>> INSERT.
>>>>>
>>>>> Best,
>>>>> Danny Chan
>>>>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
>>>>>>
>>>>>> But I think the planner needs to
>>>>>> know whether the input is insert-only or not.
>>>>>
>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Danny Chan
Thanks for driving this Timo, +1 for voting ~

Best,
Danny Chan
在 2020年9月10日 +0800 PM3:54,Timo Walther <[hidden email]>,写道:

> Thanks everyone for this healthy discussion. I updated the FLIP with the
> outcome. I think the result is one of the last core API refactoring and
> users will be happy to have a consistent changelog support. Thanks for
> all the contributions.
>
> If there are no objections, I would continue with a voting.
>
> What do you think?
>
> Regards,
> Timo
>
>
> On 09.09.20 14:31, Danny Chan wrote:
> > Thanks, i'm fine with that.
> >
> >
> > Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:
> >
> > > I agree with Jark. It reduces confusion.
> > >
> > > The DataStream API doesn't know changelog processing at all. A
> > > DataStream of Row can be used with both `fromDataStream` and
> > > `fromChangelogStream`. But only the latter API will interpret it as a
> > > changelog something.
> > >
> > > And as I mentioned before, the `toChangelogStream` must work with Row
> > > otherwise users are confused due to duplicate records with a missing
> > > changeflag.
> > >
> > > I will update the FLIP-136 a last time. I hope we can then continue to a
> > > vote.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 09.09.20 10:50, Danny Chan wrote:
> > > > I think it would bring in much confusion by a different API name just
> > > because the DataStream generic type is different.
> > > > If there are ChangelogMode that only works for Row, can we have a type
> > > check there ?
> > > >
> > > > Switch to a new API name does not really solve the problem well, people
> > > still need to declare the ChangelogMode explicitly, and there are some
> > > confusions:
> > > >
> > > > • Should DataStream of Row type always use #fromChangelogStream ?
> > > > • Does fromChangelogStream works for only INSERT ChangelogMode ?
> > > >
> > > >
> > > > Best,
> > > > Danny Chan
> > > > 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
> > > > > I had this in the inital design, but Jark had concerns at least for the
> > > > > `toChangelogStream(ChangelogMode)` (see earlier discussion).
> > > > >
> > > > > `fromDataStream(dataStream, schema, changelogMode)` would be possible.
> > > > >
> > > > > But in this case I would vote for a symmetric API. If we keep
> > > > > toChangelogStream we should also have a fromChangelogStream.
> > > > >
> > > > > And if we unify `toChangelogStream` and `toDataStream`, retractions
> > > > > cannot be represented for non-Rows and users will experience duplicate
> > > > > records with a missing changeflag.
> > > > >
> > > > > Regards,
> > > > > Timo
> > > > >
> > > > >
> > > > > On 09.09.20 09:31, Danny Chan wrote:
> > > > > > “But I think the planner needs to
> > > > > > know whether the input is insert-only or not.”
> > > > > >
> > > > > > Does fromDataStream(dataStream, schema, changelogMode)
> > > > > >
> > > > > > solve your concerns ? People can pass around whatever ChangelogMode
> > > they like as an optional param.
> > > > > > By default: fromDataStream(dataStream, schema), the ChangelogMode is
> > > INSERT.
> > > > > >
> > > > > > Best,
> > > > > > Danny Chan
> > > > > > 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
> > > > > > >
> > > > > > > But I think the planner needs to
> > > > > > > know whether the input is insert-only or not.
> > > > > >
> > > > >
> > > >
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Timo Walther-2
Hi everyone,

thanks for all the feedback. I updated the FLIP again on Thursday to
integrate the feedback I got from Jingsong and Jark offline. In
particular I updated the `Improve dealing with Row in DataStream API`
section another time. We introduced static methods for Row that should
make the semantics clear to users:

// allows to use index-based setters and getters (equivalent to new
Row(int))
// method exists for completeness
public static withPositions(int length);

// allows to use name-based setters and getters
public static withNames();

// allows to use both name-based and position-based setters and getters
public static withNamesAndPositions(Map<String, Integer> fieldNames);

In any case, non of the existing methods will be deprecated and only
additional functionality will be available through the methods above.

I started a voting thread on Friday. Please feel free to vote.

Regards,
Timo

On 10.09.20 10:21, Danny Chan wrote:

> Thanks for driving this Timo, +1 for voting ~
>
> Best,
> Danny Chan
> 在 2020年9月10日 +0800 PM3:54,Timo Walther <[hidden email]>,写道:
>> Thanks everyone for this healthy discussion. I updated the FLIP with the
>> outcome. I think the result is one of the last core API refactoring and
>> users will be happy to have a consistent changelog support. Thanks for
>> all the contributions.
>>
>> If there are no objections, I would continue with a voting.
>>
>> What do you think?
>>
>> Regards,
>> Timo
>>
>>
>> On 09.09.20 14:31, Danny Chan wrote:
>>> Thanks, i'm fine with that.
>>>
>>>
>>> Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:
>>>
>>>> I agree with Jark. It reduces confusion.
>>>>
>>>> The DataStream API doesn't know changelog processing at all. A
>>>> DataStream of Row can be used with both `fromDataStream` and
>>>> `fromChangelogStream`. But only the latter API will interpret it as a
>>>> changelog something.
>>>>
>>>> And as I mentioned before, the `toChangelogStream` must work with Row
>>>> otherwise users are confused due to duplicate records with a missing
>>>> changeflag.
>>>>
>>>> I will update the FLIP-136 a last time. I hope we can then continue to a
>>>> vote.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 09.09.20 10:50, Danny Chan wrote:
>>>>> I think it would bring in much confusion by a different API name just
>>>> because the DataStream generic type is different.
>>>>> If there are ChangelogMode that only works for Row, can we have a type
>>>> check there ?
>>>>>
>>>>> Switch to a new API name does not really solve the problem well, people
>>>> still need to declare the ChangelogMode explicitly, and there are some
>>>> confusions:
>>>>>
>>>>> • Should DataStream of Row type always use #fromChangelogStream ?
>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
>>>>>
>>>>>
>>>>> Best,
>>>>> Danny Chan
>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
>>>>>> I had this in the inital design, but Jark had concerns at least for the
>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>>>>>
>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be possible.
>>>>>>
>>>>>> But in this case I would vote for a symmetric API. If we keep
>>>>>> toChangelogStream we should also have a fromChangelogStream.
>>>>>>
>>>>>> And if we unify `toChangelogStream` and `toDataStream`, retractions
>>>>>> cannot be represented for non-Rows and users will experience duplicate
>>>>>> records with a missing changeflag.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 09.09.20 09:31, Danny Chan wrote:
>>>>>>> “But I think the planner needs to
>>>>>>> know whether the input is insert-only or not.”
>>>>>>>
>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>>>>>
>>>>>>> solve your concerns ? People can pass around whatever ChangelogMode
>>>> they like as an optional param.
>>>>>>> By default: fromDataStream(dataStream, schema), the ChangelogMode is
>>>> INSERT.
>>>>>>>
>>>>>>> Best,
>>>>>>> Danny Chan
>>>>>>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
>>>>>>>>
>>>>>>>> But I think the planner needs to
>>>>>>>> know whether the input is insert-only or not.
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Jark Wu-2
Personally I think the fieldNames Map is confusing and not handy.
I just have an idea but not sure what you think.
What about adding a new constructor with List field names, this enables all
name-based setter/getters.
Regarding to List -> Map cost for every record, we can suggest users to
reuse the Row in the task.

new Row(int arity)
new Row(List<String> fieldNames)

final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
reuse.setField("myField", 12);
reuse.setField("myOtherField", "This is a test");

My point is that, if we can have a handy constructor for named Row, we may
not need to distinguish the named-only or positionAndNamed mode.
This can avoid (fast-fail) the potential problem when setting an invalid
field.

We can also come up with a new class for the field names which will
construct the Map and be shared among all Row instances.

What do you think?

Best,
Jark

On Thu, 17 Sep 2020 at 16:48, Timo Walther <[hidden email]> wrote:

> Hi everyone,
>
> thanks for all the feedback. I updated the FLIP again on Thursday to
> integrate the feedback I got from Jingsong and Jark offline. In
> particular I updated the `Improve dealing with Row in DataStream API`
> section another time. We introduced static methods for Row that should
> make the semantics clear to users:
>
> // allows to use index-based setters and getters (equivalent to new
> Row(int))
> // method exists for completeness
> public static withPositions(int length);
>
> // allows to use name-based setters and getters
> public static withNames();
>
> // allows to use both name-based and position-based setters and getters
> public static withNamesAndPositions(Map<String, Integer> fieldNames);
>
> In any case, non of the existing methods will be deprecated and only
> additional functionality will be available through the methods above.
>
> I started a voting thread on Friday. Please feel free to vote.
>
> Regards,
> Timo
>
> On 10.09.20 10:21, Danny Chan wrote:
> > Thanks for driving this Timo, +1 for voting ~
> >
> > Best,
> > Danny Chan
> > 在 2020年9月10日 +0800 PM3:54,Timo Walther <[hidden email]>,写道:
> >> Thanks everyone for this healthy discussion. I updated the FLIP with the
> >> outcome. I think the result is one of the last core API refactoring and
> >> users will be happy to have a consistent changelog support. Thanks for
> >> all the contributions.
> >>
> >> If there are no objections, I would continue with a voting.
> >>
> >> What do you think?
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 09.09.20 14:31, Danny Chan wrote:
> >>> Thanks, i'm fine with that.
> >>>
> >>>
> >>> Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:
> >>>
> >>>> I agree with Jark. It reduces confusion.
> >>>>
> >>>> The DataStream API doesn't know changelog processing at all. A
> >>>> DataStream of Row can be used with both `fromDataStream` and
> >>>> `fromChangelogStream`. But only the latter API will interpret it as a
> >>>> changelog something.
> >>>>
> >>>> And as I mentioned before, the `toChangelogStream` must work with Row
> >>>> otherwise users are confused due to duplicate records with a missing
> >>>> changeflag.
> >>>>
> >>>> I will update the FLIP-136 a last time. I hope we can then continue
> to a
> >>>> vote.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 09.09.20 10:50, Danny Chan wrote:
> >>>>> I think it would bring in much confusion by a different API name just
> >>>> because the DataStream generic type is different.
> >>>>> If there are ChangelogMode that only works for Row, can we have a
> type
> >>>> check there ?
> >>>>>
> >>>>> Switch to a new API name does not really solve the problem well,
> people
> >>>> still need to declare the ChangelogMode explicitly, and there are some
> >>>> confusions:
> >>>>>
> >>>>> • Should DataStream of Row type always use #fromChangelogStream ?
> >>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
> >>>>>
> >>>>>
> >>>>> Best,
> >>>>> Danny Chan
> >>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
> >>>>>> I had this in the inital design, but Jark had concerns at least for
> the
> >>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>>>>>
> >>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
> possible.
> >>>>>>
> >>>>>> But in this case I would vote for a symmetric API. If we keep
> >>>>>> toChangelogStream we should also have a fromChangelogStream.
> >>>>>>
> >>>>>> And if we unify `toChangelogStream` and `toDataStream`, retractions
> >>>>>> cannot be represented for non-Rows and users will experience
> duplicate
> >>>>>> records with a missing changeflag.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 09.09.20 09:31, Danny Chan wrote:
> >>>>>>> “But I think the planner needs to
> >>>>>>> know whether the input is insert-only or not.”
> >>>>>>>
> >>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>>>>>
> >>>>>>> solve your concerns ? People can pass around whatever ChangelogMode
> >>>> they like as an optional param.
> >>>>>>> By default: fromDataStream(dataStream, schema), the ChangelogMode
> is
> >>>> INSERT.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Danny Chan
> >>>>>>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
> >>>>>>>>
> >>>>>>>> But I think the planner needs to
> >>>>>>>> know whether the input is insert-only or not.
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Timo Walther-2
Hi Jark,

the fieldNames map is not intended for users. I would also be fine to
make it a default scope constructor and access it with some internal
utility class next to the Row class. The fieldNames map must only be
used by serializers and converters. A user has no benefit in using it.

For the creation of new rows (without reusing, which only advanced users
usually do), I don't see a benefit of having:

final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
reuse.setField("myField", 12);
reuse.setField("myOtherField", "This is a test");

The purpose of Row.withName() is too create a Row easily and readable
without declaring 50+ column names or dealing with indices in this range.

Personally, I would like to make Row an interface and have concrete row
implementations for different purposes but this would break existing
programs too much.

What do you think?

Regards,
Timo


On 18.09.20 11:04, Jark Wu wrote:

> Personally I think the fieldNames Map is confusing and not handy.
> I just have an idea but not sure what you think.
> What about adding a new constructor with List field names, this enables all
> name-based setter/getters.
> Regarding to List -> Map cost for every record, we can suggest users to
> reuse the Row in the task.
>
> new Row(int arity)
> new Row(List<String> fieldNames)
>
> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> reuse.setField("myField", 12);
> reuse.setField("myOtherField", "This is a test");
>
> My point is that, if we can have a handy constructor for named Row, we may
> not need to distinguish the named-only or positionAndNamed mode.
> This can avoid (fast-fail) the potential problem when setting an invalid
> field.
>
> We can also come up with a new class for the field names which will
> construct the Map and be shared among all Row instances.
>
> What do you think?
>
> Best,
> Jark
>
> On Thu, 17 Sep 2020 at 16:48, Timo Walther <[hidden email]> wrote:
>
>> Hi everyone,
>>
>> thanks for all the feedback. I updated the FLIP again on Thursday to
>> integrate the feedback I got from Jingsong and Jark offline. In
>> particular I updated the `Improve dealing with Row in DataStream API`
>> section another time. We introduced static methods for Row that should
>> make the semantics clear to users:
>>
>> // allows to use index-based setters and getters (equivalent to new
>> Row(int))
>> // method exists for completeness
>> public static withPositions(int length);
>>
>> // allows to use name-based setters and getters
>> public static withNames();
>>
>> // allows to use both name-based and position-based setters and getters
>> public static withNamesAndPositions(Map<String, Integer> fieldNames);
>>
>> In any case, non of the existing methods will be deprecated and only
>> additional functionality will be available through the methods above.
>>
>> I started a voting thread on Friday. Please feel free to vote.
>>
>> Regards,
>> Timo
>>
>> On 10.09.20 10:21, Danny Chan wrote:
>>> Thanks for driving this Timo, +1 for voting ~
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <[hidden email]>,写道:
>>>> Thanks everyone for this healthy discussion. I updated the FLIP with the
>>>> outcome. I think the result is one of the last core API refactoring and
>>>> users will be happy to have a consistent changelog support. Thanks for
>>>> all the contributions.
>>>>
>>>> If there are no objections, I would continue with a voting.
>>>>
>>>> What do you think?
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 09.09.20 14:31, Danny Chan wrote:
>>>>> Thanks, i'm fine with that.
>>>>>
>>>>>
>>>>> Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:
>>>>>
>>>>>> I agree with Jark. It reduces confusion.
>>>>>>
>>>>>> The DataStream API doesn't know changelog processing at all. A
>>>>>> DataStream of Row can be used with both `fromDataStream` and
>>>>>> `fromChangelogStream`. But only the latter API will interpret it as a
>>>>>> changelog something.
>>>>>>
>>>>>> And as I mentioned before, the `toChangelogStream` must work with Row
>>>>>> otherwise users are confused due to duplicate records with a missing
>>>>>> changeflag.
>>>>>>
>>>>>> I will update the FLIP-136 a last time. I hope we can then continue
>> to a
>>>>>> vote.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 09.09.20 10:50, Danny Chan wrote:
>>>>>>> I think it would bring in much confusion by a different API name just
>>>>>> because the DataStream generic type is different.
>>>>>>> If there are ChangelogMode that only works for Row, can we have a
>> type
>>>>>> check there ?
>>>>>>>
>>>>>>> Switch to a new API name does not really solve the problem well,
>> people
>>>>>> still need to declare the ChangelogMode explicitly, and there are some
>>>>>> confusions:
>>>>>>>
>>>>>>> • Should DataStream of Row type always use #fromChangelogStream ?
>>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Danny Chan
>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
>>>>>>>> I had this in the inital design, but Jark had concerns at least for
>> the
>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>>>>>>>
>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
>> possible.
>>>>>>>>
>>>>>>>> But in this case I would vote for a symmetric API. If we keep
>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
>>>>>>>>
>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`, retractions
>>>>>>>> cannot be represented for non-Rows and users will experience
>> duplicate
>>>>>>>> records with a missing changeflag.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>>
>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
>>>>>>>>> “But I think the planner needs to
>>>>>>>>> know whether the input is insert-only or not.”
>>>>>>>>>
>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>>>>>>>
>>>>>>>>> solve your concerns ? People can pass around whatever ChangelogMode
>>>>>> they like as an optional param.
>>>>>>>>> By default: fromDataStream(dataStream, schema), the ChangelogMode
>> is
>>>>>> INSERT.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Danny Chan
>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
>>>>>>>>>>
>>>>>>>>>> But I think the planner needs to
>>>>>>>>>> know whether the input is insert-only or not.
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Jark Wu-2
Hi Timo,

Sorry for the late reply.
I think it would be great if we can make `withNamesAndPositions` internal
visible. This reduces the complexity of the public API.
It's hard to come up with a perfect solution. So let's move on this FLIP.
I don't have other concerns.

Best,
Jark

On Fri, 18 Sep 2020 at 22:14, Timo Walther <[hidden email]> wrote:

> Hi Jark,
>
> the fieldNames map is not intended for users. I would also be fine to
> make it a default scope constructor and access it with some internal
> utility class next to the Row class. The fieldNames map must only be
> used by serializers and converters. A user has no benefit in using it.
>
> For the creation of new rows (without reusing, which only advanced users
> usually do), I don't see a benefit of having:
>
> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> reuse.setField("myField", 12);
> reuse.setField("myOtherField", "This is a test");
>
> The purpose of Row.withName() is too create a Row easily and readable
> without declaring 50+ column names or dealing with indices in this range.
>
> Personally, I would like to make Row an interface and have concrete row
> implementations for different purposes but this would break existing
> programs too much.
>
> What do you think?
>
> Regards,
> Timo
>
>
> On 18.09.20 11:04, Jark Wu wrote:
> > Personally I think the fieldNames Map is confusing and not handy.
> > I just have an idea but not sure what you think.
> > What about adding a new constructor with List field names, this enables
> all
> > name-based setter/getters.
> > Regarding to List -> Map cost for every record, we can suggest users to
> > reuse the Row in the task.
> >
> > new Row(int arity)
> > new Row(List<String> fieldNames)
> >
> > final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> > reuse.setField("myField", 12);
> > reuse.setField("myOtherField", "This is a test");
> >
> > My point is that, if we can have a handy constructor for named Row, we
> may
> > not need to distinguish the named-only or positionAndNamed mode.
> > This can avoid (fast-fail) the potential problem when setting an invalid
> > field.
> >
> > We can also come up with a new class for the field names which will
> > construct the Map and be shared among all Row instances.
> >
> > What do you think?
> >
> > Best,
> > Jark
> >
> > On Thu, 17 Sep 2020 at 16:48, Timo Walther <[hidden email]> wrote:
> >
> >> Hi everyone,
> >>
> >> thanks for all the feedback. I updated the FLIP again on Thursday to
> >> integrate the feedback I got from Jingsong and Jark offline. In
> >> particular I updated the `Improve dealing with Row in DataStream API`
> >> section another time. We introduced static methods for Row that should
> >> make the semantics clear to users:
> >>
> >> // allows to use index-based setters and getters (equivalent to new
> >> Row(int))
> >> // method exists for completeness
> >> public static withPositions(int length);
> >>
> >> // allows to use name-based setters and getters
> >> public static withNames();
> >>
> >> // allows to use both name-based and position-based setters and getters
> >> public static withNamesAndPositions(Map<String, Integer> fieldNames);
> >>
> >> In any case, non of the existing methods will be deprecated and only
> >> additional functionality will be available through the methods above.
> >>
> >> I started a voting thread on Friday. Please feel free to vote.
> >>
> >> Regards,
> >> Timo
> >>
> >> On 10.09.20 10:21, Danny Chan wrote:
> >>> Thanks for driving this Timo, +1 for voting ~
> >>>
> >>> Best,
> >>> Danny Chan
> >>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <[hidden email]>,写道:
> >>>> Thanks everyone for this healthy discussion. I updated the FLIP with
> the
> >>>> outcome. I think the result is one of the last core API refactoring
> and
> >>>> users will be happy to have a consistent changelog support. Thanks for
> >>>> all the contributions.
> >>>>
> >>>> If there are no objections, I would continue with a voting.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 09.09.20 14:31, Danny Chan wrote:
> >>>>> Thanks, i'm fine with that.
> >>>>>
> >>>>>
> >>>>> Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:
> >>>>>
> >>>>>> I agree with Jark. It reduces confusion.
> >>>>>>
> >>>>>> The DataStream API doesn't know changelog processing at all. A
> >>>>>> DataStream of Row can be used with both `fromDataStream` and
> >>>>>> `fromChangelogStream`. But only the latter API will interpret it as
> a
> >>>>>> changelog something.
> >>>>>>
> >>>>>> And as I mentioned before, the `toChangelogStream` must work with
> Row
> >>>>>> otherwise users are confused due to duplicate records with a missing
> >>>>>> changeflag.
> >>>>>>
> >>>>>> I will update the FLIP-136 a last time. I hope we can then continue
> >> to a
> >>>>>> vote.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 09.09.20 10:50, Danny Chan wrote:
> >>>>>>> I think it would bring in much confusion by a different API name
> just
> >>>>>> because the DataStream generic type is different.
> >>>>>>> If there are ChangelogMode that only works for Row, can we have a
> >> type
> >>>>>> check there ?
> >>>>>>>
> >>>>>>> Switch to a new API name does not really solve the problem well,
> >> people
> >>>>>> still need to declare the ChangelogMode explicitly, and there are
> some
> >>>>>> confusions:
> >>>>>>>
> >>>>>>> • Should DataStream of Row type always use #fromChangelogStream ?
> >>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Danny Chan
> >>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
> >>>>>>>> I had this in the inital design, but Jark had concerns at least
> for
> >> the
> >>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>>>>>>>
> >>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
> >> possible.
> >>>>>>>>
> >>>>>>>> But in this case I would vote for a symmetric API. If we keep
> >>>>>>>> toChangelogStream we should also have a fromChangelogStream.
> >>>>>>>>
> >>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
> retractions
> >>>>>>>> cannot be represented for non-Rows and users will experience
> >> duplicate
> >>>>>>>> records with a missing changeflag.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
> >>>>>>>>> “But I think the planner needs to
> >>>>>>>>> know whether the input is insert-only or not.”
> >>>>>>>>>
> >>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>>>>>>>
> >>>>>>>>> solve your concerns ? People can pass around whatever
> ChangelogMode
> >>>>>> they like as an optional param.
> >>>>>>>>> By default: fromDataStream(dataStream, schema), the ChangelogMode
> >> is
> >>>>>> INSERT.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Danny Chan
> >>>>>>>>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
> >>>>>>>>>>
> >>>>>>>>>> But I think the planner needs to
> >>>>>>>>>> know whether the input is insert-only or not.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Timo Walther-2
Hi Jark,

thanks for your feedback. I removed `withNamesAndPositions` from the
public API list and added a comment that this is only internal API for
converters and serializers.

I would start a new vote tomorrow if there are no objections.

What do you think?

Regards,
Timo

On 23.09.20 08:55, Jark Wu wrote:

> Hi Timo,
>
> Sorry for the late reply.
> I think it would be great if we can make `withNamesAndPositions` internal
> visible. This reduces the complexity of the public API.
> It's hard to come up with a perfect solution. So let's move on this FLIP.
> I don't have other concerns.
>
> Best,
> Jark
>
> On Fri, 18 Sep 2020 at 22:14, Timo Walther <[hidden email]> wrote:
>
>> Hi Jark,
>>
>> the fieldNames map is not intended for users. I would also be fine to
>> make it a default scope constructor and access it with some internal
>> utility class next to the Row class. The fieldNames map must only be
>> used by serializers and converters. A user has no benefit in using it.
>>
>> For the creation of new rows (without reusing, which only advanced users
>> usually do), I don't see a benefit of having:
>>
>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
>> reuse.setField("myField", 12);
>> reuse.setField("myOtherField", "This is a test");
>>
>> The purpose of Row.withName() is too create a Row easily and readable
>> without declaring 50+ column names or dealing with indices in this range.
>>
>> Personally, I would like to make Row an interface and have concrete row
>> implementations for different purposes but this would break existing
>> programs too much.
>>
>> What do you think?
>>
>> Regards,
>> Timo
>>
>>
>> On 18.09.20 11:04, Jark Wu wrote:
>>> Personally I think the fieldNames Map is confusing and not handy.
>>> I just have an idea but not sure what you think.
>>> What about adding a new constructor with List field names, this enables
>> all
>>> name-based setter/getters.
>>> Regarding to List -> Map cost for every record, we can suggest users to
>>> reuse the Row in the task.
>>>
>>> new Row(int arity)
>>> new Row(List<String> fieldNames)
>>>
>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
>>> reuse.setField("myField", 12);
>>> reuse.setField("myOtherField", "This is a test");
>>>
>>> My point is that, if we can have a handy constructor for named Row, we
>> may
>>> not need to distinguish the named-only or positionAndNamed mode.
>>> This can avoid (fast-fail) the potential problem when setting an invalid
>>> field.
>>>
>>> We can also come up with a new class for the field names which will
>>> construct the Map and be shared among all Row instances.
>>>
>>> What do you think?
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 17 Sep 2020 at 16:48, Timo Walther <[hidden email]> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> thanks for all the feedback. I updated the FLIP again on Thursday to
>>>> integrate the feedback I got from Jingsong and Jark offline. In
>>>> particular I updated the `Improve dealing with Row in DataStream API`
>>>> section another time. We introduced static methods for Row that should
>>>> make the semantics clear to users:
>>>>
>>>> // allows to use index-based setters and getters (equivalent to new
>>>> Row(int))
>>>> // method exists for completeness
>>>> public static withPositions(int length);
>>>>
>>>> // allows to use name-based setters and getters
>>>> public static withNames();
>>>>
>>>> // allows to use both name-based and position-based setters and getters
>>>> public static withNamesAndPositions(Map<String, Integer> fieldNames);
>>>>
>>>> In any case, non of the existing methods will be deprecated and only
>>>> additional functionality will be available through the methods above.
>>>>
>>>> I started a voting thread on Friday. Please feel free to vote.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> On 10.09.20 10:21, Danny Chan wrote:
>>>>> Thanks for driving this Timo, +1 for voting ~
>>>>>
>>>>> Best,
>>>>> Danny Chan
>>>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <[hidden email]>,写道:
>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP with
>> the
>>>>>> outcome. I think the result is one of the last core API refactoring
>> and
>>>>>> users will be happy to have a consistent changelog support. Thanks for
>>>>>> all the contributions.
>>>>>>
>>>>>> If there are no objections, I would continue with a voting.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 09.09.20 14:31, Danny Chan wrote:
>>>>>>> Thanks, i'm fine with that.
>>>>>>>
>>>>>>>
>>>>>>> Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:
>>>>>>>
>>>>>>>> I agree with Jark. It reduces confusion.
>>>>>>>>
>>>>>>>> The DataStream API doesn't know changelog processing at all. A
>>>>>>>> DataStream of Row can be used with both `fromDataStream` and
>>>>>>>> `fromChangelogStream`. But only the latter API will interpret it as
>> a
>>>>>>>> changelog something.
>>>>>>>>
>>>>>>>> And as I mentioned before, the `toChangelogStream` must work with
>> Row
>>>>>>>> otherwise users are confused due to duplicate records with a missing
>>>>>>>> changeflag.
>>>>>>>>
>>>>>>>> I will update the FLIP-136 a last time. I hope we can then continue
>>>> to a
>>>>>>>> vote.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>>
>>>>>>>> On 09.09.20 10:50, Danny Chan wrote:
>>>>>>>>> I think it would bring in much confusion by a different API name
>> just
>>>>>>>> because the DataStream generic type is different.
>>>>>>>>> If there are ChangelogMode that only works for Row, can we have a
>>>> type
>>>>>>>> check there ?
>>>>>>>>>
>>>>>>>>> Switch to a new API name does not really solve the problem well,
>>>> people
>>>>>>>> still need to declare the ChangelogMode explicitly, and there are
>> some
>>>>>>>> confusions:
>>>>>>>>>
>>>>>>>>> • Should DataStream of Row type always use #fromChangelogStream ?
>>>>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Danny Chan
>>>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
>>>>>>>>>> I had this in the inital design, but Jark had concerns at least
>> for
>>>> the
>>>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>>>>>>>>>
>>>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
>>>> possible.
>>>>>>>>>>
>>>>>>>>>> But in this case I would vote for a symmetric API. If we keep
>>>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
>>>>>>>>>>
>>>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
>> retractions
>>>>>>>>>> cannot be represented for non-Rows and users will experience
>>>> duplicate
>>>>>>>>>> records with a missing changeflag.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Timo
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
>>>>>>>>>>> “But I think the planner needs to
>>>>>>>>>>> know whether the input is insert-only or not.”
>>>>>>>>>>>
>>>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>>>>>>>>>
>>>>>>>>>>> solve your concerns ? People can pass around whatever
>> ChangelogMode
>>>>>>>> they like as an optional param.
>>>>>>>>>>> By default: fromDataStream(dataStream, schema), the ChangelogMode
>>>> is
>>>>>>>> INSERT.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Danny Chan
>>>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
>>>>>>>>>>>>
>>>>>>>>>>>> But I think the planner needs to
>>>>>>>>>>>> know whether the input is insert-only or not.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Kurt Young
Sorry for being late, I went through the design doc and here are
my comments:

1. A minor one, how about moving Schema after DataStream in all affected
APIs? Such as:
StreamTableEnvironment.fromDataStream(Schema, DataStream<T>): Table
StreamTableEnvironment.createTemporaryView(String, Schema, DataStream<T>):
Unit
StreamTableEnvironment.fromChangelogStream(Schema, DataStream<Row>): Table
StreamTableEnvironment.toChangelogStream(Schema, Table): DataStream<Row>

It will look more aligned with APIs which don't have Schema. For example:
StreamTableEnvironment.fromDataStream(DataStream<T>): Table
StreamTableEnvironment.fromDataStream(DataStream<T>, Schema): Table

2. A question to: StreamTableEnvironment.fromDataStream(Schema,
DataStream<T>): Table
How do we convert the types between Schema and T, will we do some
verification? Will we do some type coercion? For example,
can we support Schema.LONG with DataStream<Integer>? And if T is a Tuple,
do we have some rules for setting field names in Schema?
I can see lots of imagination from this method but the rules are unclear to
me.

3. A question to: StreamTableEnvironment.fromChangelogStream(DataStream<Row>):
Table
How do you derive schema from DataStream<Row>?

4. A question to: StreamTableEnvironment.toDataStream(AbstractDataType<?>,
Table): DataStream<T>
I'm wondering whether this method is necessary. Always getting a
DataStream<Row> from the table and then manually applying some
map function seems to be not cumbersome and safer (such intelligent
conversion always seems error prone to me).

5.
> The `toChangelogStream(Schema, Table)` exists for completeness to have a
symmetric API.
> It allows for declaring the data type for output similar to
DynamicTableSinks.
> Additionally, internal structures such as StringData, TimestampData can
still be used by power users.
> In that sense, Row can behave like a GenericRowData.

How does Row behave like GenericRowData? I don't think Row can work with
RowData for now.

6. Row.withNames() seems dangerous to me. It relies on user setting all the
fields they need during `setField(String name, T value)`.
It's also highly possible that users would not set certain fields when for
example some fields are NULL. They would expect that all the fields
they didn't set should be NULL.
Row.withNames(String[] filedNames) or Row.withNames(List<String>
fieldNames) seems to be a safer choice.
I agree that simplicity is important but making API safer to use is also
important.

Best,
Kurt


On Wed, Sep 23, 2020 at 4:15 PM Timo Walther <[hidden email]> wrote:

> Hi Jark,
>
> thanks for your feedback. I removed `withNamesAndPositions` from the
> public API list and added a comment that this is only internal API for
> converters and serializers.
>
> I would start a new vote tomorrow if there are no objections.
>
> What do you think?
>
> Regards,
> Timo
>
> On 23.09.20 08:55, Jark Wu wrote:
> > Hi Timo,
> >
> > Sorry for the late reply.
> > I think it would be great if we can make `withNamesAndPositions` internal
> > visible. This reduces the complexity of the public API.
> > It's hard to come up with a perfect solution. So let's move on this FLIP.
> > I don't have other concerns.
> >
> > Best,
> > Jark
> >
> > On Fri, 18 Sep 2020 at 22:14, Timo Walther <[hidden email]> wrote:
> >
> >> Hi Jark,
> >>
> >> the fieldNames map is not intended for users. I would also be fine to
> >> make it a default scope constructor and access it with some internal
> >> utility class next to the Row class. The fieldNames map must only be
> >> used by serializers and converters. A user has no benefit in using it.
> >>
> >> For the creation of new rows (without reusing, which only advanced users
> >> usually do), I don't see a benefit of having:
> >>
> >> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >> reuse.setField("myField", 12);
> >> reuse.setField("myOtherField", "This is a test");
> >>
> >> The purpose of Row.withName() is too create a Row easily and readable
> >> without declaring 50+ column names or dealing with indices in this
> range.
> >>
> >> Personally, I would like to make Row an interface and have concrete row
> >> implementations for different purposes but this would break existing
> >> programs too much.
> >>
> >> What do you think?
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 18.09.20 11:04, Jark Wu wrote:
> >>> Personally I think the fieldNames Map is confusing and not handy.
> >>> I just have an idea but not sure what you think.
> >>> What about adding a new constructor with List field names, this enables
> >> all
> >>> name-based setter/getters.
> >>> Regarding to List -> Map cost for every record, we can suggest users to
> >>> reuse the Row in the task.
> >>>
> >>> new Row(int arity)
> >>> new Row(List<String> fieldNames)
> >>>
> >>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >>> reuse.setField("myField", 12);
> >>> reuse.setField("myOtherField", "This is a test");
> >>>
> >>> My point is that, if we can have a handy constructor for named Row, we
> >> may
> >>> not need to distinguish the named-only or positionAndNamed mode.
> >>> This can avoid (fast-fail) the potential problem when setting an
> invalid
> >>> field.
> >>>
> >>> We can also come up with a new class for the field names which will
> >>> construct the Map and be shared among all Row instances.
> >>>
> >>> What do you think?
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>> On Thu, 17 Sep 2020 at 16:48, Timo Walther <[hidden email]> wrote:
> >>>
> >>>> Hi everyone,
> >>>>
> >>>> thanks for all the feedback. I updated the FLIP again on Thursday to
> >>>> integrate the feedback I got from Jingsong and Jark offline. In
> >>>> particular I updated the `Improve dealing with Row in DataStream API`
> >>>> section another time. We introduced static methods for Row that should
> >>>> make the semantics clear to users:
> >>>>
> >>>> // allows to use index-based setters and getters (equivalent to new
> >>>> Row(int))
> >>>> // method exists for completeness
> >>>> public static withPositions(int length);
> >>>>
> >>>> // allows to use name-based setters and getters
> >>>> public static withNames();
> >>>>
> >>>> // allows to use both name-based and position-based setters and
> getters
> >>>> public static withNamesAndPositions(Map<String, Integer> fieldNames);
> >>>>
> >>>> In any case, non of the existing methods will be deprecated and only
> >>>> additional functionality will be available through the methods above.
> >>>>
> >>>> I started a voting thread on Friday. Please feel free to vote.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>> On 10.09.20 10:21, Danny Chan wrote:
> >>>>> Thanks for driving this Timo, +1 for voting ~
> >>>>>
> >>>>> Best,
> >>>>> Danny Chan
> >>>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <[hidden email]>,写道:
> >>>>>> Thanks everyone for this healthy discussion. I updated the FLIP with
> >> the
> >>>>>> outcome. I think the result is one of the last core API refactoring
> >> and
> >>>>>> users will be happy to have a consistent changelog support. Thanks
> for
> >>>>>> all the contributions.
> >>>>>>
> >>>>>> If there are no objections, I would continue with a voting.
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 09.09.20 14:31, Danny Chan wrote:
> >>>>>>> Thanks, i'm fine with that.
> >>>>>>>
> >>>>>>>
> >>>>>>> Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:
> >>>>>>>
> >>>>>>>> I agree with Jark. It reduces confusion.
> >>>>>>>>
> >>>>>>>> The DataStream API doesn't know changelog processing at all. A
> >>>>>>>> DataStream of Row can be used with both `fromDataStream` and
> >>>>>>>> `fromChangelogStream`. But only the latter API will interpret it
> as
> >> a
> >>>>>>>> changelog something.
> >>>>>>>>
> >>>>>>>> And as I mentioned before, the `toChangelogStream` must work with
> >> Row
> >>>>>>>> otherwise users are confused due to duplicate records with a
> missing
> >>>>>>>> changeflag.
> >>>>>>>>
> >>>>>>>> I will update the FLIP-136 a last time. I hope we can then
> continue
> >>>> to a
> >>>>>>>> vote.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 09.09.20 10:50, Danny Chan wrote:
> >>>>>>>>> I think it would bring in much confusion by a different API name
> >> just
> >>>>>>>> because the DataStream generic type is different.
> >>>>>>>>> If there are ChangelogMode that only works for Row, can we have a
> >>>> type
> >>>>>>>> check there ?
> >>>>>>>>>
> >>>>>>>>> Switch to a new API name does not really solve the problem well,
> >>>> people
> >>>>>>>> still need to declare the ChangelogMode explicitly, and there are
> >> some
> >>>>>>>> confusions:
> >>>>>>>>>
> >>>>>>>>> • Should DataStream of Row type always use #fromChangelogStream ?
> >>>>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Danny Chan
> >>>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
> >>>>>>>>>> I had this in the inital design, but Jark had concerns at least
> >> for
> >>>> the
> >>>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>>>>>>>>>
> >>>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
> >>>> possible.
> >>>>>>>>>>
> >>>>>>>>>> But in this case I would vote for a symmetric API. If we keep
> >>>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
> >>>>>>>>>>
> >>>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
> >> retractions
> >>>>>>>>>> cannot be represented for non-Rows and users will experience
> >>>> duplicate
> >>>>>>>>>> records with a missing changeflag.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
> >>>>>>>>>>> “But I think the planner needs to
> >>>>>>>>>>> know whether the input is insert-only or not.”
> >>>>>>>>>>>
> >>>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>>>>>>>>>
> >>>>>>>>>>> solve your concerns ? People can pass around whatever
> >> ChangelogMode
> >>>>>>>> they like as an optional param.
> >>>>>>>>>>> By default: fromDataStream(dataStream, schema), the
> ChangelogMode
> >>>> is
> >>>>>>>> INSERT.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Danny Chan
> >>>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
> >>>>>>>>>>>>
> >>>>>>>>>>>> But I think the planner needs to
> >>>>>>>>>>>> know whether the input is insert-only or not.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Timo Walther-2
Hi Kurt,

thanks for your feedback.

1. "moving Schema after DataStream": I don't have a strong opinion here.
One could argue that the API would look similar to a CREATE TABLE
statement: first schema then connector. I updated the FLIP.

2. "will we do some verification?"
Yes, we will definitely do verification. It will happen based on what is
available in TypeInformation.

"if T is a Tuple, do we have some rules for setting field names in Schema?"
The rule in this case would be to take the
TupleTypeInfoBase.getFieldNames() similar to the logic we currently have.

"Will we do some type coercion?"
For `fromDataStream()`, type coercion between an explicitly specified
Schema and DataStream will not happen (e.g. DataStream<Integer> !=
Schema.column("f", DataTypes.BIGINT())). Because the user specified the
desired data type explicitly and expects correctness.
For `toDataStream()`, it has similar type coercion semantics as a
regular table sink (first on a logical level, then on a class level).

It is difficult to list all type rules upfront, but it should behave
similar to all the work done in FLIP-65 and FLIP-95. I would move the
discussion about other type handling to the individual PRs. The general
goal should be to stay backwards compatible but reduce manual schema work.

3. "How do you derive schema from DataStream<Row>"

We use RowTypeInfo (if DataStream comes from DataStream API) or
ExternalTypeInfo (if DataStream comes from Table API).

4. "toDataStream(AbstractDataType<?>, Table) I'm wondering whether this
method is necessary"
Dealing with Row in DataStream API is very inconvenient. With the new
data format converters, the behavior would be consistent accross
DataStream API and Table functions. The logic is already present and
seems to be pretty stable so far. We would break a lot of existing code
if we get rid of this method.

5. "How does Row behave like GenericRowData?"

Row can contain StringData or further nested RowData. The data format
converters support that. The conversion of fields would be a no-op in
this case. In the end, both Row and GenericRowData just stored an Object[].

6. "They would expect that all the fields they didn't set should be NULL."

But this will be the case. The full list of all field names and their
order is defined by the data type, not the Row instance. During
serialization/conversion we can reorder fields, throw exceptions about
unknown field names, and set remaining fields to NULL.

If a user uses `new Row(5)` but the serializer is configured by a data
type that only supports `Row(3)`, it will also throw an exception during
runtime. We cannot guard users from creating invalid rows. But the
strongly typed serializers/converters will do the final verification.

Regards,
Timo


On 23.09.20 12:08, Kurt Young wrote:

> Sorry for being late, I went through the design doc and here are
> my comments:
>
> 1. A minor one, how about moving Schema after DataStream in all affected
> APIs? Such as:
> StreamTableEnvironment.fromDataStream(Schema, DataStream<T>): Table
> StreamTableEnvironment.createTemporaryView(String, Schema, DataStream<T>):
> Unit
> StreamTableEnvironment.fromChangelogStream(Schema, DataStream<Row>): Table
> StreamTableEnvironment.toChangelogStream(Schema, Table): DataStream<Row>
>
> It will look more aligned with APIs which don't have Schema. For example:
> StreamTableEnvironment.fromDataStream(DataStream<T>): Table
> StreamTableEnvironment.fromDataStream(DataStream<T>, Schema): Table
>
> 2. A question to: StreamTableEnvironment.fromDataStream(Schema,
> DataStream<T>): Table
> How do we convert the types between Schema and T, will we do some
> verification? Will we do some type coercion? For example,
> can we support Schema.LONG with DataStream<Integer>? And if T is a Tuple,
> do we have some rules for setting field names in Schema?
> I can see lots of imagination from this method but the rules are unclear to
> me.
>
> 3. A question to: StreamTableEnvironment.fromChangelogStream(DataStream<Row>):
> Table
> How do you derive schema from DataStream<Row>?
>
> 4. A question to: StreamTableEnvironment.toDataStream(AbstractDataType<?>,
> Table): DataStream<T>
> I'm wondering whether this method is necessary. Always getting a
> DataStream<Row> from the table and then manually applying some
> map function seems to be not cumbersome and safer (such intelligent
> conversion always seems error prone to me).
>
> 5.
>> The `toChangelogStream(Schema, Table)` exists for completeness to have a
> symmetric API.
>> It allows for declaring the data type for output similar to
> DynamicTableSinks.
>> Additionally, internal structures such as StringData, TimestampData can
> still be used by power users.
>> In that sense, Row can behave like a GenericRowData.
>
> How does Row behave like GenericRowData? I don't think Row can work with
> RowData for now.
>
> 6. Row.withNames() seems dangerous to me. It relies on user setting all the
> fields they need during `setField(String name, T value)`.
> It's also highly possible that users would not set certain fields when for
> example some fields are NULL. They would expect that all the fields
> they didn't set should be NULL.
> Row.withNames(String[] filedNames) or Row.withNames(List<String>
> fieldNames) seems to be a safer choice.
> I agree that simplicity is important but making API safer to use is also
> important.
>
> Best,
> Kurt
>
>
> On Wed, Sep 23, 2020 at 4:15 PM Timo Walther <[hidden email]> wrote:
>
>> Hi Jark,
>>
>> thanks for your feedback. I removed `withNamesAndPositions` from the
>> public API list and added a comment that this is only internal API for
>> converters and serializers.
>>
>> I would start a new vote tomorrow if there are no objections.
>>
>> What do you think?
>>
>> Regards,
>> Timo
>>
>> On 23.09.20 08:55, Jark Wu wrote:
>>> Hi Timo,
>>>
>>> Sorry for the late reply.
>>> I think it would be great if we can make `withNamesAndPositions` internal
>>> visible. This reduces the complexity of the public API.
>>> It's hard to come up with a perfect solution. So let's move on this FLIP.
>>> I don't have other concerns.
>>>
>>> Best,
>>> Jark
>>>
>>> On Fri, 18 Sep 2020 at 22:14, Timo Walther <[hidden email]> wrote:
>>>
>>>> Hi Jark,
>>>>
>>>> the fieldNames map is not intended for users. I would also be fine to
>>>> make it a default scope constructor and access it with some internal
>>>> utility class next to the Row class. The fieldNames map must only be
>>>> used by serializers and converters. A user has no benefit in using it.
>>>>
>>>> For the creation of new rows (without reusing, which only advanced users
>>>> usually do), I don't see a benefit of having:
>>>>
>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
>>>> reuse.setField("myField", 12);
>>>> reuse.setField("myOtherField", "This is a test");
>>>>
>>>> The purpose of Row.withName() is too create a Row easily and readable
>>>> without declaring 50+ column names or dealing with indices in this
>> range.
>>>>
>>>> Personally, I would like to make Row an interface and have concrete row
>>>> implementations for different purposes but this would break existing
>>>> programs too much.
>>>>
>>>> What do you think?
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 18.09.20 11:04, Jark Wu wrote:
>>>>> Personally I think the fieldNames Map is confusing and not handy.
>>>>> I just have an idea but not sure what you think.
>>>>> What about adding a new constructor with List field names, this enables
>>>> all
>>>>> name-based setter/getters.
>>>>> Regarding to List -> Map cost for every record, we can suggest users to
>>>>> reuse the Row in the task.
>>>>>
>>>>> new Row(int arity)
>>>>> new Row(List<String> fieldNames)
>>>>>
>>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
>>>>> reuse.setField("myField", 12);
>>>>> reuse.setField("myOtherField", "This is a test");
>>>>>
>>>>> My point is that, if we can have a handy constructor for named Row, we
>>>> may
>>>>> not need to distinguish the named-only or positionAndNamed mode.
>>>>> This can avoid (fast-fail) the potential problem when setting an
>> invalid
>>>>> field.
>>>>>
>>>>> We can also come up with a new class for the field names which will
>>>>> construct the Map and be shared among all Row instances.
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Thu, 17 Sep 2020 at 16:48, Timo Walther <[hidden email]> wrote:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> thanks for all the feedback. I updated the FLIP again on Thursday to
>>>>>> integrate the feedback I got from Jingsong and Jark offline. In
>>>>>> particular I updated the `Improve dealing with Row in DataStream API`
>>>>>> section another time. We introduced static methods for Row that should
>>>>>> make the semantics clear to users:
>>>>>>
>>>>>> // allows to use index-based setters and getters (equivalent to new
>>>>>> Row(int))
>>>>>> // method exists for completeness
>>>>>> public static withPositions(int length);
>>>>>>
>>>>>> // allows to use name-based setters and getters
>>>>>> public static withNames();
>>>>>>
>>>>>> // allows to use both name-based and position-based setters and
>> getters
>>>>>> public static withNamesAndPositions(Map<String, Integer> fieldNames);
>>>>>>
>>>>>> In any case, non of the existing methods will be deprecated and only
>>>>>> additional functionality will be available through the methods above.
>>>>>>
>>>>>> I started a voting thread on Friday. Please feel free to vote.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>> On 10.09.20 10:21, Danny Chan wrote:
>>>>>>> Thanks for driving this Timo, +1 for voting ~
>>>>>>>
>>>>>>> Best,
>>>>>>> Danny Chan
>>>>>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <[hidden email]>,写道:
>>>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP with
>>>> the
>>>>>>>> outcome. I think the result is one of the last core API refactoring
>>>> and
>>>>>>>> users will be happy to have a consistent changelog support. Thanks
>> for
>>>>>>>> all the contributions.
>>>>>>>>
>>>>>>>> If there are no objections, I would continue with a voting.
>>>>>>>>
>>>>>>>> What do you think?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>>
>>>>>>>> On 09.09.20 14:31, Danny Chan wrote:
>>>>>>>>> Thanks, i'm fine with that.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:
>>>>>>>>>
>>>>>>>>>> I agree with Jark. It reduces confusion.
>>>>>>>>>>
>>>>>>>>>> The DataStream API doesn't know changelog processing at all. A
>>>>>>>>>> DataStream of Row can be used with both `fromDataStream` and
>>>>>>>>>> `fromChangelogStream`. But only the latter API will interpret it
>> as
>>>> a
>>>>>>>>>> changelog something.
>>>>>>>>>>
>>>>>>>>>> And as I mentioned before, the `toChangelogStream` must work with
>>>> Row
>>>>>>>>>> otherwise users are confused due to duplicate records with a
>> missing
>>>>>>>>>> changeflag.
>>>>>>>>>>
>>>>>>>>>> I will update the FLIP-136 a last time. I hope we can then
>> continue
>>>>>> to a
>>>>>>>>>> vote.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Timo
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 09.09.20 10:50, Danny Chan wrote:
>>>>>>>>>>> I think it would bring in much confusion by a different API name
>>>> just
>>>>>>>>>> because the DataStream generic type is different.
>>>>>>>>>>> If there are ChangelogMode that only works for Row, can we have a
>>>>>> type
>>>>>>>>>> check there ?
>>>>>>>>>>>
>>>>>>>>>>> Switch to a new API name does not really solve the problem well,
>>>>>> people
>>>>>>>>>> still need to declare the ChangelogMode explicitly, and there are
>>>> some
>>>>>>>>>> confusions:
>>>>>>>>>>>
>>>>>>>>>>> • Should DataStream of Row type always use #fromChangelogStream ?
>>>>>>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Danny Chan
>>>>>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
>>>>>>>>>>>> I had this in the inital design, but Jark had concerns at least
>>>> for
>>>>>> the
>>>>>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>>>>>>>>>>>
>>>>>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
>>>>>> possible.
>>>>>>>>>>>>
>>>>>>>>>>>> But in this case I would vote for a symmetric API. If we keep
>>>>>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
>>>>>>>>>>>>
>>>>>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
>>>> retractions
>>>>>>>>>>>> cannot be represented for non-Rows and users will experience
>>>>>> duplicate
>>>>>>>>>>>> records with a missing changeflag.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Timo
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
>>>>>>>>>>>>> “But I think the planner needs to
>>>>>>>>>>>>> know whether the input is insert-only or not.”
>>>>>>>>>>>>>
>>>>>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>>>>>>>>>>>
>>>>>>>>>>>>> solve your concerns ? People can pass around whatever
>>>> ChangelogMode
>>>>>>>>>> they like as an optional param.
>>>>>>>>>>>>> By default: fromDataStream(dataStream, schema), the
>> ChangelogMode
>>>>>> is
>>>>>>>>>> INSERT.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Danny Chan
>>>>>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> But I think the planner needs to
>>>>>>>>>>>>>> know whether the input is insert-only or not.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Kurt Young
Thanks for the detailed response, 1-5 sounds good to me.

For #6, I just think of another case which would also annoy users. Consider
code like this:

Row row = Row.withNames();
row.setField("a", 1);
row.setField("b", 2);

and for second time, he changes the sequence of setting method calls:

Row row = Row.withNames();
row.setField("b", 2);
row.setField("a", 1);

I don't think anyone would expect these two rows are actually different.

Instead, if we at least define the field names first, which will fix the
order, we would not have such side effects.

Best,
Kurt


On Wed, Sep 23, 2020 at 8:47 PM Timo Walther <[hidden email]> wrote:

> Hi Kurt,
>
> thanks for your feedback.
>
> 1. "moving Schema after DataStream": I don't have a strong opinion here.
> One could argue that the API would look similar to a CREATE TABLE
> statement: first schema then connector. I updated the FLIP.
>
> 2. "will we do some verification?"
> Yes, we will definitely do verification. It will happen based on what is
> available in TypeInformation.
>
> "if T is a Tuple, do we have some rules for setting field names in Schema?"
> The rule in this case would be to take the
> TupleTypeInfoBase.getFieldNames() similar to the logic we currently have.
>
> "Will we do some type coercion?"
> For `fromDataStream()`, type coercion between an explicitly specified
> Schema and DataStream will not happen (e.g. DataStream<Integer> !=
> Schema.column("f", DataTypes.BIGINT())). Because the user specified the
> desired data type explicitly and expects correctness.
> For `toDataStream()`, it has similar type coercion semantics as a
> regular table sink (first on a logical level, then on a class level).
>
> It is difficult to list all type rules upfront, but it should behave
> similar to all the work done in FLIP-65 and FLIP-95. I would move the
> discussion about other type handling to the individual PRs. The general
> goal should be to stay backwards compatible but reduce manual schema work.
>
> 3. "How do you derive schema from DataStream<Row>"
>
> We use RowTypeInfo (if DataStream comes from DataStream API) or
> ExternalTypeInfo (if DataStream comes from Table API).
>
> 4. "toDataStream(AbstractDataType<?>, Table) I'm wondering whether this
> method is necessary"
> Dealing with Row in DataStream API is very inconvenient. With the new
> data format converters, the behavior would be consistent accross
> DataStream API and Table functions. The logic is already present and
> seems to be pretty stable so far. We would break a lot of existing code
> if we get rid of this method.
>
> 5. "How does Row behave like GenericRowData?"
>
> Row can contain StringData or further nested RowData. The data format
> converters support that. The conversion of fields would be a no-op in
> this case. In the end, both Row and GenericRowData just stored an Object[].
>
> 6. "They would expect that all the fields they didn't set should be NULL."
>
> But this will be the case. The full list of all field names and their
> order is defined by the data type, not the Row instance. During
> serialization/conversion we can reorder fields, throw exceptions about
> unknown field names, and set remaining fields to NULL.
>
> If a user uses `new Row(5)` but the serializer is configured by a data
> type that only supports `Row(3)`, it will also throw an exception during
> runtime. We cannot guard users from creating invalid rows. But the
> strongly typed serializers/converters will do the final verification.
>
> Regards,
> Timo
>
>
> On 23.09.20 12:08, Kurt Young wrote:
> > Sorry for being late, I went through the design doc and here are
> > my comments:
> >
> > 1. A minor one, how about moving Schema after DataStream in all affected
> > APIs? Such as:
> > StreamTableEnvironment.fromDataStream(Schema, DataStream<T>): Table
> > StreamTableEnvironment.createTemporaryView(String, Schema,
> DataStream<T>):
> > Unit
> > StreamTableEnvironment.fromChangelogStream(Schema, DataStream<Row>):
> Table
> > StreamTableEnvironment.toChangelogStream(Schema, Table): DataStream<Row>
> >
> > It will look more aligned with APIs which don't have Schema. For example:
> > StreamTableEnvironment.fromDataStream(DataStream<T>): Table
> > StreamTableEnvironment.fromDataStream(DataStream<T>, Schema): Table
> >
> > 2. A question to: StreamTableEnvironment.fromDataStream(Schema,
> > DataStream<T>): Table
> > How do we convert the types between Schema and T, will we do some
> > verification? Will we do some type coercion? For example,
> > can we support Schema.LONG with DataStream<Integer>? And if T is a Tuple,
> > do we have some rules for setting field names in Schema?
> > I can see lots of imagination from this method but the rules are unclear
> to
> > me.
> >
> > 3. A question to:
> StreamTableEnvironment.fromChangelogStream(DataStream<Row>):
> > Table
> > How do you derive schema from DataStream<Row>?
> >
> > 4. A question to:
> StreamTableEnvironment.toDataStream(AbstractDataType<?>,
> > Table): DataStream<T>
> > I'm wondering whether this method is necessary. Always getting a
> > DataStream<Row> from the table and then manually applying some
> > map function seems to be not cumbersome and safer (such intelligent
> > conversion always seems error prone to me).
> >
> > 5.
> >> The `toChangelogStream(Schema, Table)` exists for completeness to have a
> > symmetric API.
> >> It allows for declaring the data type for output similar to
> > DynamicTableSinks.
> >> Additionally, internal structures such as StringData, TimestampData can
> > still be used by power users.
> >> In that sense, Row can behave like a GenericRowData.
> >
> > How does Row behave like GenericRowData? I don't think Row can work with
> > RowData for now.
> >
> > 6. Row.withNames() seems dangerous to me. It relies on user setting all
> the
> > fields they need during `setField(String name, T value)`.
> > It's also highly possible that users would not set certain fields when
> for
> > example some fields are NULL. They would expect that all the fields
> > they didn't set should be NULL.
> > Row.withNames(String[] filedNames) or Row.withNames(List<String>
> > fieldNames) seems to be a safer choice.
> > I agree that simplicity is important but making API safer to use is also
> > important.
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Sep 23, 2020 at 4:15 PM Timo Walther <[hidden email]> wrote:
> >
> >> Hi Jark,
> >>
> >> thanks for your feedback. I removed `withNamesAndPositions` from the
> >> public API list and added a comment that this is only internal API for
> >> converters and serializers.
> >>
> >> I would start a new vote tomorrow if there are no objections.
> >>
> >> What do you think?
> >>
> >> Regards,
> >> Timo
> >>
> >> On 23.09.20 08:55, Jark Wu wrote:
> >>> Hi Timo,
> >>>
> >>> Sorry for the late reply.
> >>> I think it would be great if we can make `withNamesAndPositions`
> internal
> >>> visible. This reduces the complexity of the public API.
> >>> It's hard to come up with a perfect solution. So let's move on this
> FLIP.
> >>> I don't have other concerns.
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>> On Fri, 18 Sep 2020 at 22:14, Timo Walther <[hidden email]> wrote:
> >>>
> >>>> Hi Jark,
> >>>>
> >>>> the fieldNames map is not intended for users. I would also be fine to
> >>>> make it a default scope constructor and access it with some internal
> >>>> utility class next to the Row class. The fieldNames map must only be
> >>>> used by serializers and converters. A user has no benefit in using it.
> >>>>
> >>>> For the creation of new rows (without reusing, which only advanced
> users
> >>>> usually do), I don't see a benefit of having:
> >>>>
> >>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >>>> reuse.setField("myField", 12);
> >>>> reuse.setField("myOtherField", "This is a test");
> >>>>
> >>>> The purpose of Row.withName() is too create a Row easily and readable
> >>>> without declaring 50+ column names or dealing with indices in this
> >> range.
> >>>>
> >>>> Personally, I would like to make Row an interface and have concrete
> row
> >>>> implementations for different purposes but this would break existing
> >>>> programs too much.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 18.09.20 11:04, Jark Wu wrote:
> >>>>> Personally I think the fieldNames Map is confusing and not handy.
> >>>>> I just have an idea but not sure what you think.
> >>>>> What about adding a new constructor with List field names, this
> enables
> >>>> all
> >>>>> name-based setter/getters.
> >>>>> Regarding to List -> Map cost for every record, we can suggest users
> to
> >>>>> reuse the Row in the task.
> >>>>>
> >>>>> new Row(int arity)
> >>>>> new Row(List<String> fieldNames)
> >>>>>
> >>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >>>>> reuse.setField("myField", 12);
> >>>>> reuse.setField("myOtherField", "This is a test");
> >>>>>
> >>>>> My point is that, if we can have a handy constructor for named Row,
> we
> >>>> may
> >>>>> not need to distinguish the named-only or positionAndNamed mode.
> >>>>> This can avoid (fast-fail) the potential problem when setting an
> >> invalid
> >>>>> field.
> >>>>>
> >>>>> We can also come up with a new class for the field names which will
> >>>>> construct the Map and be shared among all Row instances.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>> On Thu, 17 Sep 2020 at 16:48, Timo Walther <[hidden email]>
> wrote:
> >>>>>
> >>>>>> Hi everyone,
> >>>>>>
> >>>>>> thanks for all the feedback. I updated the FLIP again on Thursday to
> >>>>>> integrate the feedback I got from Jingsong and Jark offline. In
> >>>>>> particular I updated the `Improve dealing with Row in DataStream
> API`
> >>>>>> section another time. We introduced static methods for Row that
> should
> >>>>>> make the semantics clear to users:
> >>>>>>
> >>>>>> // allows to use index-based setters and getters (equivalent to new
> >>>>>> Row(int))
> >>>>>> // method exists for completeness
> >>>>>> public static withPositions(int length);
> >>>>>>
> >>>>>> // allows to use name-based setters and getters
> >>>>>> public static withNames();
> >>>>>>
> >>>>>> // allows to use both name-based and position-based setters and
> >> getters
> >>>>>> public static withNamesAndPositions(Map<String, Integer>
> fieldNames);
> >>>>>>
> >>>>>> In any case, non of the existing methods will be deprecated and only
> >>>>>> additional functionality will be available through the methods
> above.
> >>>>>>
> >>>>>> I started a voting thread on Friday. Please feel free to vote.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>> On 10.09.20 10:21, Danny Chan wrote:
> >>>>>>> Thanks for driving this Timo, +1 for voting ~
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Danny Chan
> >>>>>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <[hidden email]>,写道:
> >>>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP
> with
> >>>> the
> >>>>>>>> outcome. I think the result is one of the last core API
> refactoring
> >>>> and
> >>>>>>>> users will be happy to have a consistent changelog support. Thanks
> >> for
> >>>>>>>> all the contributions.
> >>>>>>>>
> >>>>>>>> If there are no objections, I would continue with a voting.
> >>>>>>>>
> >>>>>>>> What do you think?
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 09.09.20 14:31, Danny Chan wrote:
> >>>>>>>>> Thanks, i'm fine with that.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:
> >>>>>>>>>
> >>>>>>>>>> I agree with Jark. It reduces confusion.
> >>>>>>>>>>
> >>>>>>>>>> The DataStream API doesn't know changelog processing at all. A
> >>>>>>>>>> DataStream of Row can be used with both `fromDataStream` and
> >>>>>>>>>> `fromChangelogStream`. But only the latter API will interpret it
> >> as
> >>>> a
> >>>>>>>>>> changelog something.
> >>>>>>>>>>
> >>>>>>>>>> And as I mentioned before, the `toChangelogStream` must work
> with
> >>>> Row
> >>>>>>>>>> otherwise users are confused due to duplicate records with a
> >> missing
> >>>>>>>>>> changeflag.
> >>>>>>>>>>
> >>>>>>>>>> I will update the FLIP-136 a last time. I hope we can then
> >> continue
> >>>>>> to a
> >>>>>>>>>> vote.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 09.09.20 10:50, Danny Chan wrote:
> >>>>>>>>>>> I think it would bring in much confusion by a different API
> name
> >>>> just
> >>>>>>>>>> because the DataStream generic type is different.
> >>>>>>>>>>> If there are ChangelogMode that only works for Row, can we
> have a
> >>>>>> type
> >>>>>>>>>> check there ?
> >>>>>>>>>>>
> >>>>>>>>>>> Switch to a new API name does not really solve the problem
> well,
> >>>>>> people
> >>>>>>>>>> still need to declare the ChangelogMode explicitly, and there
> are
> >>>> some
> >>>>>>>>>> confusions:
> >>>>>>>>>>>
> >>>>>>>>>>> • Should DataStream of Row type always use
> #fromChangelogStream ?
> >>>>>>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode
> ?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Danny Chan
> >>>>>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
> >>>>>>>>>>>> I had this in the inital design, but Jark had concerns at
> least
> >>>> for
> >>>>>> the
> >>>>>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>>>>>>>>>>>
> >>>>>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
> >>>>>> possible.
> >>>>>>>>>>>>
> >>>>>>>>>>>> But in this case I would vote for a symmetric API. If we keep
> >>>>>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
> >>>>>>>>>>>>
> >>>>>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
> >>>> retractions
> >>>>>>>>>>>> cannot be represented for non-Rows and users will experience
> >>>>>> duplicate
> >>>>>>>>>>>> records with a missing changeflag.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
> >>>>>>>>>>>>> “But I think the planner needs to
> >>>>>>>>>>>>> know whether the input is insert-only or not.”
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> solve your concerns ? People can pass around whatever
> >>>> ChangelogMode
> >>>>>>>>>> they like as an optional param.
> >>>>>>>>>>>>> By default: fromDataStream(dataStream, schema), the
> >> ChangelogMode
> >>>>>> is
> >>>>>>>>>> INSERT.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> But I think the planner needs to
> >>>>>>>>>>>>>> know whether the input is insert-only or not.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Timo Walther-2
But the examples you mentioned would not be different.

By calling `Row.withNames()`, the row has no definition of position. All
position-based methods would throw an exception.

The hashCode()/equals() would return true for:

 > Row row1 = Row.withNames();
 > row.setField("a", 1);
 > row.setField("b", 2);
 >
 > Row row2 = Row.withNames();
 > row.setField("b", 2);
 > row.setField("a", 1);

row2.equals(row1)

The row is just a container for the serializer/converter which will
ensure ordering.

Regards,
Timo

On 23.09.20 15:00, Kurt Young wrote:

> Thanks for the detailed response, 1-5 sounds good to me.
>
> For #6, I just think of another case which would also annoy users. Consider
> code like this:
>
> Row row = Row.withNames();
> row.setField("a", 1);
> row.setField("b", 2);
>
> and for second time, he changes the sequence of setting method calls:
>
> Row row = Row.withNames();
> row.setField("b", 2);
> row.setField("a", 1);
>
> I don't think anyone would expect these two rows are actually different.
>
> Instead, if we at least define the field names first, which will fix the
> order, we would not have such side effects.
>
> Best,
> Kurt
>
>
> On Wed, Sep 23, 2020 at 8:47 PM Timo Walther <[hidden email]> wrote:
>
>> Hi Kurt,
>>
>> thanks for your feedback.
>>
>> 1. "moving Schema after DataStream": I don't have a strong opinion here.
>> One could argue that the API would look similar to a CREATE TABLE
>> statement: first schema then connector. I updated the FLIP.
>>
>> 2. "will we do some verification?"
>> Yes, we will definitely do verification. It will happen based on what is
>> available in TypeInformation.
>>
>> "if T is a Tuple, do we have some rules for setting field names in Schema?"
>> The rule in this case would be to take the
>> TupleTypeInfoBase.getFieldNames() similar to the logic we currently have.
>>
>> "Will we do some type coercion?"
>> For `fromDataStream()`, type coercion between an explicitly specified
>> Schema and DataStream will not happen (e.g. DataStream<Integer> !=
>> Schema.column("f", DataTypes.BIGINT())). Because the user specified the
>> desired data type explicitly and expects correctness.
>> For `toDataStream()`, it has similar type coercion semantics as a
>> regular table sink (first on a logical level, then on a class level).
>>
>> It is difficult to list all type rules upfront, but it should behave
>> similar to all the work done in FLIP-65 and FLIP-95. I would move the
>> discussion about other type handling to the individual PRs. The general
>> goal should be to stay backwards compatible but reduce manual schema work.
>>
>> 3. "How do you derive schema from DataStream<Row>"
>>
>> We use RowTypeInfo (if DataStream comes from DataStream API) or
>> ExternalTypeInfo (if DataStream comes from Table API).
>>
>> 4. "toDataStream(AbstractDataType<?>, Table) I'm wondering whether this
>> method is necessary"
>> Dealing with Row in DataStream API is very inconvenient. With the new
>> data format converters, the behavior would be consistent accross
>> DataStream API and Table functions. The logic is already present and
>> seems to be pretty stable so far. We would break a lot of existing code
>> if we get rid of this method.
>>
>> 5. "How does Row behave like GenericRowData?"
>>
>> Row can contain StringData or further nested RowData. The data format
>> converters support that. The conversion of fields would be a no-op in
>> this case. In the end, both Row and GenericRowData just stored an Object[].
>>
>> 6. "They would expect that all the fields they didn't set should be NULL."
>>
>> But this will be the case. The full list of all field names and their
>> order is defined by the data type, not the Row instance. During
>> serialization/conversion we can reorder fields, throw exceptions about
>> unknown field names, and set remaining fields to NULL.
>>
>> If a user uses `new Row(5)` but the serializer is configured by a data
>> type that only supports `Row(3)`, it will also throw an exception during
>> runtime. We cannot guard users from creating invalid rows. But the
>> strongly typed serializers/converters will do the final verification.
>>
>> Regards,
>> Timo
>>
>>
>> On 23.09.20 12:08, Kurt Young wrote:
>>> Sorry for being late, I went through the design doc and here are
>>> my comments:
>>>
>>> 1. A minor one, how about moving Schema after DataStream in all affected
>>> APIs? Such as:
>>> StreamTableEnvironment.fromDataStream(Schema, DataStream<T>): Table
>>> StreamTableEnvironment.createTemporaryView(String, Schema,
>> DataStream<T>):
>>> Unit
>>> StreamTableEnvironment.fromChangelogStream(Schema, DataStream<Row>):
>> Table
>>> StreamTableEnvironment.toChangelogStream(Schema, Table): DataStream<Row>
>>>
>>> It will look more aligned with APIs which don't have Schema. For example:
>>> StreamTableEnvironment.fromDataStream(DataStream<T>): Table
>>> StreamTableEnvironment.fromDataStream(DataStream<T>, Schema): Table
>>>
>>> 2. A question to: StreamTableEnvironment.fromDataStream(Schema,
>>> DataStream<T>): Table
>>> How do we convert the types between Schema and T, will we do some
>>> verification? Will we do some type coercion? For example,
>>> can we support Schema.LONG with DataStream<Integer>? And if T is a Tuple,
>>> do we have some rules for setting field names in Schema?
>>> I can see lots of imagination from this method but the rules are unclear
>> to
>>> me.
>>>
>>> 3. A question to:
>> StreamTableEnvironment.fromChangelogStream(DataStream<Row>):
>>> Table
>>> How do you derive schema from DataStream<Row>?
>>>
>>> 4. A question to:
>> StreamTableEnvironment.toDataStream(AbstractDataType<?>,
>>> Table): DataStream<T>
>>> I'm wondering whether this method is necessary. Always getting a
>>> DataStream<Row> from the table and then manually applying some
>>> map function seems to be not cumbersome and safer (such intelligent
>>> conversion always seems error prone to me).
>>>
>>> 5.
>>>> The `toChangelogStream(Schema, Table)` exists for completeness to have a
>>> symmetric API.
>>>> It allows for declaring the data type for output similar to
>>> DynamicTableSinks.
>>>> Additionally, internal structures such as StringData, TimestampData can
>>> still be used by power users.
>>>> In that sense, Row can behave like a GenericRowData.
>>>
>>> How does Row behave like GenericRowData? I don't think Row can work with
>>> RowData for now.
>>>
>>> 6. Row.withNames() seems dangerous to me. It relies on user setting all
>> the
>>> fields they need during `setField(String name, T value)`.
>>> It's also highly possible that users would not set certain fields when
>> for
>>> example some fields are NULL. They would expect that all the fields
>>> they didn't set should be NULL.
>>> Row.withNames(String[] filedNames) or Row.withNames(List<String>
>>> fieldNames) seems to be a safer choice.
>>> I agree that simplicity is important but making API safer to use is also
>>> important.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Sep 23, 2020 at 4:15 PM Timo Walther <[hidden email]> wrote:
>>>
>>>> Hi Jark,
>>>>
>>>> thanks for your feedback. I removed `withNamesAndPositions` from the
>>>> public API list and added a comment that this is only internal API for
>>>> converters and serializers.
>>>>
>>>> I would start a new vote tomorrow if there are no objections.
>>>>
>>>> What do you think?
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> On 23.09.20 08:55, Jark Wu wrote:
>>>>> Hi Timo,
>>>>>
>>>>> Sorry for the late reply.
>>>>> I think it would be great if we can make `withNamesAndPositions`
>> internal
>>>>> visible. This reduces the complexity of the public API.
>>>>> It's hard to come up with a perfect solution. So let's move on this
>> FLIP.
>>>>> I don't have other concerns.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Fri, 18 Sep 2020 at 22:14, Timo Walther <[hidden email]> wrote:
>>>>>
>>>>>> Hi Jark,
>>>>>>
>>>>>> the fieldNames map is not intended for users. I would also be fine to
>>>>>> make it a default scope constructor and access it with some internal
>>>>>> utility class next to the Row class. The fieldNames map must only be
>>>>>> used by serializers and converters. A user has no benefit in using it.
>>>>>>
>>>>>> For the creation of new rows (without reusing, which only advanced
>> users
>>>>>> usually do), I don't see a benefit of having:
>>>>>>
>>>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
>>>>>> reuse.setField("myField", 12);
>>>>>> reuse.setField("myOtherField", "This is a test");
>>>>>>
>>>>>> The purpose of Row.withName() is too create a Row easily and readable
>>>>>> without declaring 50+ column names or dealing with indices in this
>>>> range.
>>>>>>
>>>>>> Personally, I would like to make Row an interface and have concrete
>> row
>>>>>> implementations for different purposes but this would break existing
>>>>>> programs too much.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 18.09.20 11:04, Jark Wu wrote:
>>>>>>> Personally I think the fieldNames Map is confusing and not handy.
>>>>>>> I just have an idea but not sure what you think.
>>>>>>> What about adding a new constructor with List field names, this
>> enables
>>>>>> all
>>>>>>> name-based setter/getters.
>>>>>>> Regarding to List -> Map cost for every record, we can suggest users
>> to
>>>>>>> reuse the Row in the task.
>>>>>>>
>>>>>>> new Row(int arity)
>>>>>>> new Row(List<String> fieldNames)
>>>>>>>
>>>>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
>>>>>>> reuse.setField("myField", 12);
>>>>>>> reuse.setField("myOtherField", "This is a test");
>>>>>>>
>>>>>>> My point is that, if we can have a handy constructor for named Row,
>> we
>>>>>> may
>>>>>>> not need to distinguish the named-only or positionAndNamed mode.
>>>>>>> This can avoid (fast-fail) the potential problem when setting an
>>>> invalid
>>>>>>> field.
>>>>>>>
>>>>>>> We can also come up with a new class for the field names which will
>>>>>>> construct the Map and be shared among all Row instances.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Thu, 17 Sep 2020 at 16:48, Timo Walther <[hidden email]>
>> wrote:
>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> thanks for all the feedback. I updated the FLIP again on Thursday to
>>>>>>>> integrate the feedback I got from Jingsong and Jark offline. In
>>>>>>>> particular I updated the `Improve dealing with Row in DataStream
>> API`
>>>>>>>> section another time. We introduced static methods for Row that
>> should
>>>>>>>> make the semantics clear to users:
>>>>>>>>
>>>>>>>> // allows to use index-based setters and getters (equivalent to new
>>>>>>>> Row(int))
>>>>>>>> // method exists for completeness
>>>>>>>> public static withPositions(int length);
>>>>>>>>
>>>>>>>> // allows to use name-based setters and getters
>>>>>>>> public static withNames();
>>>>>>>>
>>>>>>>> // allows to use both name-based and position-based setters and
>>>> getters
>>>>>>>> public static withNamesAndPositions(Map<String, Integer>
>> fieldNames);
>>>>>>>>
>>>>>>>> In any case, non of the existing methods will be deprecated and only
>>>>>>>> additional functionality will be available through the methods
>> above.
>>>>>>>>
>>>>>>>> I started a voting thread on Friday. Please feel free to vote.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>> On 10.09.20 10:21, Danny Chan wrote:
>>>>>>>>> Thanks for driving this Timo, +1 for voting ~
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Danny Chan
>>>>>>>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <[hidden email]>,写道:
>>>>>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP
>> with
>>>>>> the
>>>>>>>>>> outcome. I think the result is one of the last core API
>> refactoring
>>>>>> and
>>>>>>>>>> users will be happy to have a consistent changelog support. Thanks
>>>> for
>>>>>>>>>> all the contributions.
>>>>>>>>>>
>>>>>>>>>> If there are no objections, I would continue with a voting.
>>>>>>>>>>
>>>>>>>>>> What do you think?
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Timo
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 09.09.20 14:31, Danny Chan wrote:
>>>>>>>>>>> Thanks, i'm fine with that.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:
>>>>>>>>>>>
>>>>>>>>>>>> I agree with Jark. It reduces confusion.
>>>>>>>>>>>>
>>>>>>>>>>>> The DataStream API doesn't know changelog processing at all. A
>>>>>>>>>>>> DataStream of Row can be used with both `fromDataStream` and
>>>>>>>>>>>> `fromChangelogStream`. But only the latter API will interpret it
>>>> as
>>>>>> a
>>>>>>>>>>>> changelog something.
>>>>>>>>>>>>
>>>>>>>>>>>> And as I mentioned before, the `toChangelogStream` must work
>> with
>>>>>> Row
>>>>>>>>>>>> otherwise users are confused due to duplicate records with a
>>>> missing
>>>>>>>>>>>> changeflag.
>>>>>>>>>>>>
>>>>>>>>>>>> I will update the FLIP-136 a last time. I hope we can then
>>>> continue
>>>>>>>> to a
>>>>>>>>>>>> vote.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Timo
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 09.09.20 10:50, Danny Chan wrote:
>>>>>>>>>>>>> I think it would bring in much confusion by a different API
>> name
>>>>>> just
>>>>>>>>>>>> because the DataStream generic type is different.
>>>>>>>>>>>>> If there are ChangelogMode that only works for Row, can we
>> have a
>>>>>>>> type
>>>>>>>>>>>> check there ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Switch to a new API name does not really solve the problem
>> well,
>>>>>>>> people
>>>>>>>>>>>> still need to declare the ChangelogMode explicitly, and there
>> are
>>>>>> some
>>>>>>>>>>>> confusions:
>>>>>>>>>>>>>
>>>>>>>>>>>>> • Should DataStream of Row type always use
>> #fromChangelogStream ?
>>>>>>>>>>>>> • Does fromChangelogStream works for only INSERT ChangelogMode
>> ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Danny Chan
>>>>>>>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]>,写道:
>>>>>>>>>>>>>> I had this in the inital design, but Jark had concerns at
>> least
>>>>>> for
>>>>>>>> the
>>>>>>>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
>>>>>>>> possible.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> But in this case I would vote for a symmetric API. If we keep
>>>>>>>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
>>>>>> retractions
>>>>>>>>>>>>>> cannot be represented for non-Rows and users will experience
>>>>>>>> duplicate
>>>>>>>>>>>>>> records with a missing changeflag.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
>>>>>>>>>>>>>>> “But I think the planner needs to
>>>>>>>>>>>>>>> know whether the input is insert-only or not.”
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> solve your concerns ? People can pass around whatever
>>>>>> ChangelogMode
>>>>>>>>>>>> they like as an optional param.
>>>>>>>>>>>>>>> By default: fromDataStream(dataStream, schema), the
>>>> ChangelogMode
>>>>>>>> is
>>>>>>>>>>>> INSERT.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Danny Chan
>>>>>>>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> But I think the planner needs to
>>>>>>>>>>>>>>>> know whether the input is insert-only or not.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

123