[DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

classic Classic list List threaded Threaded
41 messages Options
123
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-136: Improve interoperability between DataStream and Table API

Kurt Young
I see, I missed the part that a row is either in positioned mode or nameed
mode.
I can live with this. Thanks.

Best,
Kurt


On Wed, Sep 23, 2020 at 9:07 PM Timo Walther <[hidden email]> wrote:

> But the examples you mentioned would not be different.
>
> By calling `Row.withNames()`, the row has no definition of position. All
> position-based methods would throw an exception.
>
> The hashCode()/equals() would return true for:
>
>  > Row row1 = Row.withNames();
>  > row.setField("a", 1);
>  > row.setField("b", 2);
>  >
>  > Row row2 = Row.withNames();
>  > row.setField("b", 2);
>  > row.setField("a", 1);
>
> row2.equals(row1)
>
> The row is just a container for the serializer/converter which will
> ensure ordering.
>
> Regards,
> Timo
>
> On 23.09.20 15:00, Kurt Young wrote:
> > Thanks for the detailed response, 1-5 sounds good to me.
> >
> > For #6, I just think of another case which would also annoy users.
> Consider
> > code like this:
> >
> > Row row = Row.withNames();
> > row.setField("a", 1);
> > row.setField("b", 2);
> >
> > and for second time, he changes the sequence of setting method calls:
> >
> > Row row = Row.withNames();
> > row.setField("b", 2);
> > row.setField("a", 1);
> >
> > I don't think anyone would expect these two rows are actually different.
> >
> > Instead, if we at least define the field names first, which will fix the
> > order, we would not have such side effects.
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Sep 23, 2020 at 8:47 PM Timo Walther <[hidden email]> wrote:
> >
> >> Hi Kurt,
> >>
> >> thanks for your feedback.
> >>
> >> 1. "moving Schema after DataStream": I don't have a strong opinion here.
> >> One could argue that the API would look similar to a CREATE TABLE
> >> statement: first schema then connector. I updated the FLIP.
> >>
> >> 2. "will we do some verification?"
> >> Yes, we will definitely do verification. It will happen based on what is
> >> available in TypeInformation.
> >>
> >> "if T is a Tuple, do we have some rules for setting field names in
> Schema?"
> >> The rule in this case would be to take the
> >> TupleTypeInfoBase.getFieldNames() similar to the logic we currently
> have.
> >>
> >> "Will we do some type coercion?"
> >> For `fromDataStream()`, type coercion between an explicitly specified
> >> Schema and DataStream will not happen (e.g. DataStream<Integer> !=
> >> Schema.column("f", DataTypes.BIGINT())). Because the user specified the
> >> desired data type explicitly and expects correctness.
> >> For `toDataStream()`, it has similar type coercion semantics as a
> >> regular table sink (first on a logical level, then on a class level).
> >>
> >> It is difficult to list all type rules upfront, but it should behave
> >> similar to all the work done in FLIP-65 and FLIP-95. I would move the
> >> discussion about other type handling to the individual PRs. The general
> >> goal should be to stay backwards compatible but reduce manual schema
> work.
> >>
> >> 3. "How do you derive schema from DataStream<Row>"
> >>
> >> We use RowTypeInfo (if DataStream comes from DataStream API) or
> >> ExternalTypeInfo (if DataStream comes from Table API).
> >>
> >> 4. "toDataStream(AbstractDataType<?>, Table) I'm wondering whether this
> >> method is necessary"
> >> Dealing with Row in DataStream API is very inconvenient. With the new
> >> data format converters, the behavior would be consistent accross
> >> DataStream API and Table functions. The logic is already present and
> >> seems to be pretty stable so far. We would break a lot of existing code
> >> if we get rid of this method.
> >>
> >> 5. "How does Row behave like GenericRowData?"
> >>
> >> Row can contain StringData or further nested RowData. The data format
> >> converters support that. The conversion of fields would be a no-op in
> >> this case. In the end, both Row and GenericRowData just stored an
> Object[].
> >>
> >> 6. "They would expect that all the fields they didn't set should be
> NULL."
> >>
> >> But this will be the case. The full list of all field names and their
> >> order is defined by the data type, not the Row instance. During
> >> serialization/conversion we can reorder fields, throw exceptions about
> >> unknown field names, and set remaining fields to NULL.
> >>
> >> If a user uses `new Row(5)` but the serializer is configured by a data
> >> type that only supports `Row(3)`, it will also throw an exception during
> >> runtime. We cannot guard users from creating invalid rows. But the
> >> strongly typed serializers/converters will do the final verification.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 23.09.20 12:08, Kurt Young wrote:
> >>> Sorry for being late, I went through the design doc and here are
> >>> my comments:
> >>>
> >>> 1. A minor one, how about moving Schema after DataStream in all
> affected
> >>> APIs? Such as:
> >>> StreamTableEnvironment.fromDataStream(Schema, DataStream<T>): Table
> >>> StreamTableEnvironment.createTemporaryView(String, Schema,
> >> DataStream<T>):
> >>> Unit
> >>> StreamTableEnvironment.fromChangelogStream(Schema, DataStream<Row>):
> >> Table
> >>> StreamTableEnvironment.toChangelogStream(Schema, Table):
> DataStream<Row>
> >>>
> >>> It will look more aligned with APIs which don't have Schema. For
> example:
> >>> StreamTableEnvironment.fromDataStream(DataStream<T>): Table
> >>> StreamTableEnvironment.fromDataStream(DataStream<T>, Schema): Table
> >>>
> >>> 2. A question to: StreamTableEnvironment.fromDataStream(Schema,
> >>> DataStream<T>): Table
> >>> How do we convert the types between Schema and T, will we do some
> >>> verification? Will we do some type coercion? For example,
> >>> can we support Schema.LONG with DataStream<Integer>? And if T is a
> Tuple,
> >>> do we have some rules for setting field names in Schema?
> >>> I can see lots of imagination from this method but the rules are
> unclear
> >> to
> >>> me.
> >>>
> >>> 3. A question to:
> >> StreamTableEnvironment.fromChangelogStream(DataStream<Row>):
> >>> Table
> >>> How do you derive schema from DataStream<Row>?
> >>>
> >>> 4. A question to:
> >> StreamTableEnvironment.toDataStream(AbstractDataType<?>,
> >>> Table): DataStream<T>
> >>> I'm wondering whether this method is necessary. Always getting a
> >>> DataStream<Row> from the table and then manually applying some
> >>> map function seems to be not cumbersome and safer (such intelligent
> >>> conversion always seems error prone to me).
> >>>
> >>> 5.
> >>>> The `toChangelogStream(Schema, Table)` exists for completeness to
> have a
> >>> symmetric API.
> >>>> It allows for declaring the data type for output similar to
> >>> DynamicTableSinks.
> >>>> Additionally, internal structures such as StringData, TimestampData
> can
> >>> still be used by power users.
> >>>> In that sense, Row can behave like a GenericRowData.
> >>>
> >>> How does Row behave like GenericRowData? I don't think Row can work
> with
> >>> RowData for now.
> >>>
> >>> 6. Row.withNames() seems dangerous to me. It relies on user setting all
> >> the
> >>> fields they need during `setField(String name, T value)`.
> >>> It's also highly possible that users would not set certain fields when
> >> for
> >>> example some fields are NULL. They would expect that all the fields
> >>> they didn't set should be NULL.
> >>> Row.withNames(String[] filedNames) or Row.withNames(List<String>
> >>> fieldNames) seems to be a safer choice.
> >>> I agree that simplicity is important but making API safer to use is
> also
> >>> important.
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Wed, Sep 23, 2020 at 4:15 PM Timo Walther <[hidden email]>
> wrote:
> >>>
> >>>> Hi Jark,
> >>>>
> >>>> thanks for your feedback. I removed `withNamesAndPositions` from the
> >>>> public API list and added a comment that this is only internal API for
> >>>> converters and serializers.
> >>>>
> >>>> I would start a new vote tomorrow if there are no objections.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>> On 23.09.20 08:55, Jark Wu wrote:
> >>>>> Hi Timo,
> >>>>>
> >>>>> Sorry for the late reply.
> >>>>> I think it would be great if we can make `withNamesAndPositions`
> >> internal
> >>>>> visible. This reduces the complexity of the public API.
> >>>>> It's hard to come up with a perfect solution. So let's move on this
> >> FLIP.
> >>>>> I don't have other concerns.
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>> On Fri, 18 Sep 2020 at 22:14, Timo Walther <[hidden email]>
> wrote:
> >>>>>
> >>>>>> Hi Jark,
> >>>>>>
> >>>>>> the fieldNames map is not intended for users. I would also be fine
> to
> >>>>>> make it a default scope constructor and access it with some internal
> >>>>>> utility class next to the Row class. The fieldNames map must only be
> >>>>>> used by serializers and converters. A user has no benefit in using
> it.
> >>>>>>
> >>>>>> For the creation of new rows (without reusing, which only advanced
> >> users
> >>>>>> usually do), I don't see a benefit of having:
> >>>>>>
> >>>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >>>>>> reuse.setField("myField", 12);
> >>>>>> reuse.setField("myOtherField", "This is a test");
> >>>>>>
> >>>>>> The purpose of Row.withName() is too create a Row easily and
> readable
> >>>>>> without declaring 50+ column names or dealing with indices in this
> >>>> range.
> >>>>>>
> >>>>>> Personally, I would like to make Row an interface and have concrete
> >> row
> >>>>>> implementations for different purposes but this would break existing
> >>>>>> programs too much.
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 18.09.20 11:04, Jark Wu wrote:
> >>>>>>> Personally I think the fieldNames Map is confusing and not handy.
> >>>>>>> I just have an idea but not sure what you think.
> >>>>>>> What about adding a new constructor with List field names, this
> >> enables
> >>>>>> all
> >>>>>>> name-based setter/getters.
> >>>>>>> Regarding to List -> Map cost for every record, we can suggest
> users
> >> to
> >>>>>>> reuse the Row in the task.
> >>>>>>>
> >>>>>>> new Row(int arity)
> >>>>>>> new Row(List<String> fieldNames)
> >>>>>>>
> >>>>>>> final Row reuse = new Row(Arrays.asList("myField", "myOtherField"))
> >>>>>>> reuse.setField("myField", 12);
> >>>>>>> reuse.setField("myOtherField", "This is a test");
> >>>>>>>
> >>>>>>> My point is that, if we can have a handy constructor for named Row,
> >> we
> >>>>>> may
> >>>>>>> not need to distinguish the named-only or positionAndNamed mode.
> >>>>>>> This can avoid (fast-fail) the potential problem when setting an
> >>>> invalid
> >>>>>>> field.
> >>>>>>>
> >>>>>>> We can also come up with a new class for the field names which will
> >>>>>>> construct the Map and be shared among all Row instances.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jark
> >>>>>>>
> >>>>>>> On Thu, 17 Sep 2020 at 16:48, Timo Walther <[hidden email]>
> >> wrote:
> >>>>>>>
> >>>>>>>> Hi everyone,
> >>>>>>>>
> >>>>>>>> thanks for all the feedback. I updated the FLIP again on Thursday
> to
> >>>>>>>> integrate the feedback I got from Jingsong and Jark offline. In
> >>>>>>>> particular I updated the `Improve dealing with Row in DataStream
> >> API`
> >>>>>>>> section another time. We introduced static methods for Row that
> >> should
> >>>>>>>> make the semantics clear to users:
> >>>>>>>>
> >>>>>>>> // allows to use index-based setters and getters (equivalent to
> new
> >>>>>>>> Row(int))
> >>>>>>>> // method exists for completeness
> >>>>>>>> public static withPositions(int length);
> >>>>>>>>
> >>>>>>>> // allows to use name-based setters and getters
> >>>>>>>> public static withNames();
> >>>>>>>>
> >>>>>>>> // allows to use both name-based and position-based setters and
> >>>> getters
> >>>>>>>> public static withNamesAndPositions(Map<String, Integer>
> >> fieldNames);
> >>>>>>>>
> >>>>>>>> In any case, non of the existing methods will be deprecated and
> only
> >>>>>>>> additional functionality will be available through the methods
> >> above.
> >>>>>>>>
> >>>>>>>> I started a voting thread on Friday. Please feel free to vote.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>> On 10.09.20 10:21, Danny Chan wrote:
> >>>>>>>>> Thanks for driving this Timo, +1 for voting ~
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Danny Chan
> >>>>>>>>> 在 2020年9月10日 +0800 PM3:54,Timo Walther <[hidden email]>,写道:
> >>>>>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP
> >> with
> >>>>>> the
> >>>>>>>>>> outcome. I think the result is one of the last core API
> >> refactoring
> >>>>>> and
> >>>>>>>>>> users will be happy to have a consistent changelog support.
> Thanks
> >>>> for
> >>>>>>>>>> all the contributions.
> >>>>>>>>>>
> >>>>>>>>>> If there are no objections, I would continue with a voting.
> >>>>>>>>>>
> >>>>>>>>>> What do you think?
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 09.09.20 14:31, Danny Chan wrote:
> >>>>>>>>>>> Thanks, i'm fine with that.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Timo Walther <[hidden email]> 于2020年9月9日周三 下午7:02写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> I agree with Jark. It reduces confusion.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The DataStream API doesn't know changelog processing at all. A
> >>>>>>>>>>>> DataStream of Row can be used with both `fromDataStream` and
> >>>>>>>>>>>> `fromChangelogStream`. But only the latter API will interpret
> it
> >>>> as
> >>>>>> a
> >>>>>>>>>>>> changelog something.
> >>>>>>>>>>>>
> >>>>>>>>>>>> And as I mentioned before, the `toChangelogStream` must work
> >> with
> >>>>>> Row
> >>>>>>>>>>>> otherwise users are confused due to duplicate records with a
> >>>> missing
> >>>>>>>>>>>> changeflag.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I will update the FLIP-136 a last time. I hope we can then
> >>>> continue
> >>>>>>>> to a
> >>>>>>>>>>>> vote.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 09.09.20 10:50, Danny Chan wrote:
> >>>>>>>>>>>>> I think it would bring in much confusion by a different API
> >> name
> >>>>>> just
> >>>>>>>>>>>> because the DataStream generic type is different.
> >>>>>>>>>>>>> If there are ChangelogMode that only works for Row, can we
> >> have a
> >>>>>>>> type
> >>>>>>>>>>>> check there ?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Switch to a new API name does not really solve the problem
> >> well,
> >>>>>>>> people
> >>>>>>>>>>>> still need to declare the ChangelogMode explicitly, and there
> >> are
> >>>>>> some
> >>>>>>>>>>>> confusions:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> • Should DataStream of Row type always use
> >> #fromChangelogStream ?
> >>>>>>>>>>>>> • Does fromChangelogStream works for only INSERT
> ChangelogMode
> >> ?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>> 在 2020年9月9日 +0800 PM4:21,Timo Walther <[hidden email]
> >,写道:
> >>>>>>>>>>>>>> I had this in the inital design, but Jark had concerns at
> >> least
> >>>>>> for
> >>>>>>>> the
> >>>>>>>>>>>>>> `toChangelogStream(ChangelogMode)` (see earlier discussion).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> `fromDataStream(dataStream, schema, changelogMode)` would be
> >>>>>>>> possible.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> But in this case I would vote for a symmetric API. If we
> keep
> >>>>>>>>>>>>>> toChangelogStream we should also have a fromChangelogStream.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> And if we unify `toChangelogStream` and `toDataStream`,
> >>>>>> retractions
> >>>>>>>>>>>>>> cannot be represented for non-Rows and users will experience
> >>>>>>>> duplicate
> >>>>>>>>>>>>>> records with a missing changeflag.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 09.09.20 09:31, Danny Chan wrote:
> >>>>>>>>>>>>>>> “But I think the planner needs to
> >>>>>>>>>>>>>>> know whether the input is insert-only or not.”
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Does fromDataStream(dataStream, schema, changelogMode)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> solve your concerns ? People can pass around whatever
> >>>>>> ChangelogMode
> >>>>>>>>>>>> they like as an optional param.
> >>>>>>>>>>>>>>> By default: fromDataStream(dataStream, schema), the
> >>>> ChangelogMode
> >>>>>>>> is
> >>>>>>>>>>>> INSERT.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>>>> 在 2020年9月9日 +0800 PM2:53,[hidden email],写道:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> But I think the planner needs to
> >>>>>>>>>>>>>>>> know whether the input is insert-only or not.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>
123