[DISCUSS] Enhancing the functionality and productivity of Table API

classic Classic list List threaded Threaded
36 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Enhancing the functionality and productivity of Table API

jincheng sun
Hi all,

With the continuous efforts from the community, the Flink system has been
continuously improved, which has attracted more and more users. Flink SQL
is a canonical, widely used relational query language. However, there are
still some scenarios where Flink SQL failed to meet user needs in terms of
functionality and ease of use, such as:


   -

   In terms of functionality

Iteration, user-defined window, user-defined join, user-defined
GroupReduce, etc. Users cannot express them with SQL;

   -

   In terms of ease of use
   -

      Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
      udf2(), udf3()....)” can be used to accomplish the same function., with a
      map() function returning 100 columns, one has to define or call 100 UDFs
      when using SQL, which is quite involved.
      -

      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be
      implemented with “table.join(udtf).select()”. However, it is obvious that
      datastream is easier to use than SQL.


Due to the above two reasons, some users have to use the DataStream API or
the DataSet API. But when they do that, they lose the unification of batch
and streaming. They will also lose the sophisticated optimizations such as
codegen, aggregate join transpose  and multi-stage agg from Flink SQL.

We believe that enhancing the functionality and productivity is vital for
the successful adoption of Table API. To this end,  Table API still
requires more efforts from every contributor in the community. We see great
opportunity in improving our user’s experience from this work. Any feedback
is welcome.

Regards,

Jincheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

jincheng sun
*--------I am sorry for the formatting of the email content. I reformat
the **content** as follows-----------*

*Hi ALL,*

With the continuous efforts from the community, the Flink system has been
continuously improved, which has attracted more and more users. Flink SQL
is a canonical, widely used relational query language. However, there are
still some scenarios where Flink SQL failed to meet user needs in terms of
functionality and ease of use, such as:

*1. In terms of functionality*
    Iteration, user-defined window, user-defined join, user-defined
GroupReduce, etc. Users cannot express them with SQL;

*2. In terms of ease of use*

   - Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
   udf2(), udf3()....)” can be used to accomplish the same function., with a
   map() function returning 100 columns, one has to define or call 100 UDFs
   when using SQL, which is quite involved.
   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be
   implemented with “table.join(udtf).select()”. However, it is obvious that
   dataStream is easier to use than SQL.

Due to the above two reasons, some users have to use the DataStream API or
the DataSet API. But when they do that, they lose the unification of batch
and streaming. They will also lose the sophisticated optimizations such as
codegen, aggregate join transpose and multi-stage agg from Flink SQL.

We believe that enhancing the functionality and productivity is vital for
the successful adoption of Table API. To this end,  Table API still
requires more efforts from every contributor in the community. We see great
opportunity in improving our user’s experience from this work. Any feedback
is welcome.

Regards,

Jincheng

jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:

> Hi all,
>
> With the continuous efforts from the community, the Flink system has been
> continuously improved, which has attracted more and more users. Flink SQL
> is a canonical, widely used relational query language. However, there are
> still some scenarios where Flink SQL failed to meet user needs in terms of
> functionality and ease of use, such as:
>
>
>    -
>
>    In terms of functionality
>
> Iteration, user-defined window, user-defined join, user-defined
> GroupReduce, etc. Users cannot express them with SQL;
>
>    -
>
>    In terms of ease of use
>    -
>
>       Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
>       udf2(), udf3()....)” can be used to accomplish the same function., with a
>       map() function returning 100 columns, one has to define or call 100 UDFs
>       when using SQL, which is quite involved.
>       -
>
>       FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can
>       be implemented with “table.join(udtf).select()”. However, it is obvious
>       that datastream is easier to use than SQL.
>
>
> Due to the above two reasons, some users have to use the DataStream API or
> the DataSet API. But when they do that, they lose the unification of batch
> and streaming. They will also lose the sophisticated optimizations such as
> codegen, aggregate join transpose  and multi-stage agg from Flink SQL.
>
> We believe that enhancing the functionality and productivity is vital for
> the successful adoption of Table API. To this end,  Table API still
> requires more efforts from every contributor in the community. We see great
> opportunity in improving our user’s experience from this work. Any feedback
> is welcome.
>
> Regards,
>
> Jincheng
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

Aljoscha Krettek-2
Hi Jincheng,

these points sound very good! Are there any concrete proposals for changes? For example a FLIP/design document?

See here for FLIPs: https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

Best,
Aljoscha

> On 1. Nov 2018, at 12:51, jincheng sun <[hidden email]> wrote:
>
> *--------I am sorry for the formatting of the email content. I reformat
> the **content** as follows-----------*
>
> *Hi ALL,*
>
> With the continuous efforts from the community, the Flink system has been
> continuously improved, which has attracted more and more users. Flink SQL
> is a canonical, widely used relational query language. However, there are
> still some scenarios where Flink SQL failed to meet user needs in terms of
> functionality and ease of use, such as:
>
> *1. In terms of functionality*
>    Iteration, user-defined window, user-defined join, user-defined
> GroupReduce, etc. Users cannot express them with SQL;
>
> *2. In terms of ease of use*
>
>   - Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
>   udf2(), udf3()....)” can be used to accomplish the same function., with a
>   map() function returning 100 columns, one has to define or call 100 UDFs
>   when using SQL, which is quite involved.
>   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be
>   implemented with “table.join(udtf).select()”. However, it is obvious that
>   dataStream is easier to use than SQL.
>
> Due to the above two reasons, some users have to use the DataStream API or
> the DataSet API. But when they do that, they lose the unification of batch
> and streaming. They will also lose the sophisticated optimizations such as
> codegen, aggregate join transpose and multi-stage agg from Flink SQL.
>
> We believe that enhancing the functionality and productivity is vital for
> the successful adoption of Table API. To this end,  Table API still
> requires more efforts from every contributor in the community. We see great
> opportunity in improving our user’s experience from this work. Any feedback
> is welcome.
>
> Regards,
>
> Jincheng
>
> jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:
>
>> Hi all,
>>
>> With the continuous efforts from the community, the Flink system has been
>> continuously improved, which has attracted more and more users. Flink SQL
>> is a canonical, widely used relational query language. However, there are
>> still some scenarios where Flink SQL failed to meet user needs in terms of
>> functionality and ease of use, such as:
>>
>>
>>   -
>>
>>   In terms of functionality
>>
>> Iteration, user-defined window, user-defined join, user-defined
>> GroupReduce, etc. Users cannot express them with SQL;
>>
>>   -
>>
>>   In terms of ease of use
>>   -
>>
>>      Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
>>      udf2(), udf3()....)” can be used to accomplish the same function., with a
>>      map() function returning 100 columns, one has to define or call 100 UDFs
>>      when using SQL, which is quite involved.
>>      -
>>
>>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can
>>      be implemented with “table.join(udtf).select()”. However, it is obvious
>>      that datastream is easier to use than SQL.
>>
>>
>> Due to the above two reasons, some users have to use the DataStream API or
>> the DataSet API. But when they do that, they lose the unification of batch
>> and streaming. They will also lose the sophisticated optimizations such as
>> codegen, aggregate join transpose  and multi-stage agg from Flink SQL.
>>
>> We believe that enhancing the functionality and productivity is vital for
>> the successful adoption of Table API. To this end,  Table API still
>> requires more efforts from every contributor in the community. We see great
>> opportunity in improving our user’s experience from this work. Any feedback
>> is welcome.
>>
>> Regards,
>>
>> Jincheng
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

Timo Walther-2
Hi Jincheng,

I was also thinking about introducing a process function for the Table
API several times. This would allow to define more complex logic (custom
windows, timers, etc.) embedded into a relational API with schema
awareness and optimization around the black box. Of course this would
mean that we diverge with Table API from SQL API, however, it would open
the Table API also for more event-driven applications.

Maybe it would be possible to define timers and firing logic using Table
API expressions and UDFs. Within planning this would be treated as a
special Calc node.

Just some ideas that might be interesting for new use cases.

Regards,
Timo


Am 01.11.18 um 13:12 schrieb Aljoscha Krettek:

> Hi Jincheng,
>
> these points sound very good! Are there any concrete proposals for changes? For example a FLIP/design document?
>
> See here for FLIPs: https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
> Best,
> Aljoscha
>
>> On 1. Nov 2018, at 12:51, jincheng sun <[hidden email]> wrote:
>>
>> *--------I am sorry for the formatting of the email content. I reformat
>> the **content** as follows-----------*
>>
>> *Hi ALL,*
>>
>> With the continuous efforts from the community, the Flink system has been
>> continuously improved, which has attracted more and more users. Flink SQL
>> is a canonical, widely used relational query language. However, there are
>> still some scenarios where Flink SQL failed to meet user needs in terms of
>> functionality and ease of use, such as:
>>
>> *1. In terms of functionality*
>>     Iteration, user-defined window, user-defined join, user-defined
>> GroupReduce, etc. Users cannot express them with SQL;
>>
>> *2. In terms of ease of use*
>>
>>    - Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
>>    udf2(), udf3()....)” can be used to accomplish the same function., with a
>>    map() function returning 100 columns, one has to define or call 100 UDFs
>>    when using SQL, which is quite involved.
>>    - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be
>>    implemented with “table.join(udtf).select()”. However, it is obvious that
>>    dataStream is easier to use than SQL.
>>
>> Due to the above two reasons, some users have to use the DataStream API or
>> the DataSet API. But when they do that, they lose the unification of batch
>> and streaming. They will also lose the sophisticated optimizations such as
>> codegen, aggregate join transpose and multi-stage agg from Flink SQL.
>>
>> We believe that enhancing the functionality and productivity is vital for
>> the successful adoption of Table API. To this end,  Table API still
>> requires more efforts from every contributor in the community. We see great
>> opportunity in improving our user’s experience from this work. Any feedback
>> is welcome.
>>
>> Regards,
>>
>> Jincheng
>>
>> jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:
>>
>>> Hi all,
>>>
>>> With the continuous efforts from the community, the Flink system has been
>>> continuously improved, which has attracted more and more users. Flink SQL
>>> is a canonical, widely used relational query language. However, there are
>>> still some scenarios where Flink SQL failed to meet user needs in terms of
>>> functionality and ease of use, such as:
>>>
>>>
>>>    -
>>>
>>>    In terms of functionality
>>>
>>> Iteration, user-defined window, user-defined join, user-defined
>>> GroupReduce, etc. Users cannot express them with SQL;
>>>
>>>    -
>>>
>>>    In terms of ease of use
>>>    -
>>>
>>>       Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
>>>       udf2(), udf3()....)” can be used to accomplish the same function., with a
>>>       map() function returning 100 columns, one has to define or call 100 UDFs
>>>       when using SQL, which is quite involved.
>>>       -
>>>
>>>       FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can
>>>       be implemented with “table.join(udtf).select()”. However, it is obvious
>>>       that datastream is easier to use than SQL.
>>>
>>>
>>> Due to the above two reasons, some users have to use the DataStream API or
>>> the DataSet API. But when they do that, they lose the unification of batch
>>> and streaming. They will also lose the sophisticated optimizations such as
>>> codegen, aggregate join transpose  and multi-stage agg from Flink SQL.
>>>
>>> We believe that enhancing the functionality and productivity is vital for
>>> the successful adoption of Table API. To this end,  Table API still
>>> requires more efforts from every contributor in the community. We see great
>>> opportunity in improving our user’s experience from this work. Any feedback
>>> is welcome.
>>>
>>> Regards,
>>>
>>> Jincheng
>>>
>>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

jincheng sun
In reply to this post by Aljoscha Krettek-2
Hi, Aljoscha,

Thanks for your feedback and suggestions. I think your are right, the
detailed design/FLIP is very necessary. Before the detailed design or open
a FLIP, I would like to hear the community's views on Enhancing the
functionality and productivity of Table API,  to ensure that it worth to
effort. If most community members agree with my proposal, I will list the
changes and discuss with all community members. Is that make sense to you?

Thanks,
Jincheng

Aljoscha Krettek <[hidden email]> 于2018年11月1日周四 下午8:12写道:

> Hi Jincheng,
>
> these points sound very good! Are there any concrete proposals for
> changes? For example a FLIP/design document?
>
> See here for FLIPs:
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
> Best,
> Aljoscha
>
> > On 1. Nov 2018, at 12:51, jincheng sun <[hidden email]> wrote:
> >
> > *--------I am sorry for the formatting of the email content. I reformat
> > the **content** as follows-----------*
> >
> > *Hi ALL,*
> >
> > With the continuous efforts from the community, the Flink system has been
> > continuously improved, which has attracted more and more users. Flink SQL
> > is a canonical, widely used relational query language. However, there are
> > still some scenarios where Flink SQL failed to meet user needs in terms
> of
> > functionality and ease of use, such as:
> >
> > *1. In terms of functionality*
> >    Iteration, user-defined window, user-defined join, user-defined
> > GroupReduce, etc. Users cannot express them with SQL;
> >
> > *2. In terms of ease of use*
> >
> >   - Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
> >   udf2(), udf3()....)” can be used to accomplish the same function.,
> with a
> >   map() function returning 100 columns, one has to define or call 100
> UDFs
> >   when using SQL, which is quite involved.
> >   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be
> >   implemented with “table.join(udtf).select()”. However, it is obvious
> that
> >   dataStream is easier to use than SQL.
> >
> > Due to the above two reasons, some users have to use the DataStream API
> or
> > the DataSet API. But when they do that, they lose the unification of
> batch
> > and streaming. They will also lose the sophisticated optimizations such
> as
> > codegen, aggregate join transpose and multi-stage agg from Flink SQL.
> >
> > We believe that enhancing the functionality and productivity is vital for
> > the successful adoption of Table API. To this end,  Table API still
> > requires more efforts from every contributor in the community. We see
> great
> > opportunity in improving our user’s experience from this work. Any
> feedback
> > is welcome.
> >
> > Regards,
> >
> > Jincheng
> >
> > jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:
> >
> >> Hi all,
> >>
> >> With the continuous efforts from the community, the Flink system has
> been
> >> continuously improved, which has attracted more and more users. Flink
> SQL
> >> is a canonical, widely used relational query language. However, there
> are
> >> still some scenarios where Flink SQL failed to meet user needs in terms
> of
> >> functionality and ease of use, such as:
> >>
> >>
> >>   -
> >>
> >>   In terms of functionality
> >>
> >> Iteration, user-defined window, user-defined join, user-defined
> >> GroupReduce, etc. Users cannot express them with SQL;
> >>
> >>   -
> >>
> >>   In terms of ease of use
> >>   -
> >>
> >>      Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
> >>      udf2(), udf3()....)” can be used to accomplish the same function.,
> with a
> >>      map() function returning 100 columns, one has to define or call
> 100 UDFs
> >>      when using SQL, which is quite involved.
> >>      -
> >>
> >>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can
> >>      be implemented with “table.join(udtf).select()”. However, it is
> obvious
> >>      that datastream is easier to use than SQL.
> >>
> >>
> >> Due to the above two reasons, some users have to use the DataStream API
> or
> >> the DataSet API. But when they do that, they lose the unification of
> batch
> >> and streaming. They will also lose the sophisticated optimizations such
> as
> >> codegen, aggregate join transpose  and multi-stage agg from Flink SQL.
> >>
> >> We believe that enhancing the functionality and productivity is vital
> for
> >> the successful adoption of Table API. To this end,  Table API still
> >> requires more efforts from every contributor in the community. We see
> great
> >> opportunity in improving our user’s experience from this work. Any
> feedback
> >> is welcome.
> >>
> >> Regards,
> >>
> >> Jincheng
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

Aljoscha Krettek-2
Yes, that makes sense!

> On 1. Nov 2018, at 15:51, jincheng sun <[hidden email]> wrote:
>
> Hi, Aljoscha,
>
> Thanks for your feedback and suggestions. I think your are right, the
> detailed design/FLIP is very necessary. Before the detailed design or open
> a FLIP, I would like to hear the community's views on Enhancing the
> functionality and productivity of Table API,  to ensure that it worth to
> effort. If most community members agree with my proposal, I will list the
> changes and discuss with all community members. Is that make sense to you?
>
> Thanks,
> Jincheng
>
> Aljoscha Krettek <[hidden email]> 于2018年11月1日周四 下午8:12写道:
>
>> Hi Jincheng,
>>
>> these points sound very good! Are there any concrete proposals for
>> changes? For example a FLIP/design document?
>>
>> See here for FLIPs:
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>
>> Best,
>> Aljoscha
>>
>>> On 1. Nov 2018, at 12:51, jincheng sun <[hidden email]> wrote:
>>>
>>> *--------I am sorry for the formatting of the email content. I reformat
>>> the **content** as follows-----------*
>>>
>>> *Hi ALL,*
>>>
>>> With the continuous efforts from the community, the Flink system has been
>>> continuously improved, which has attracted more and more users. Flink SQL
>>> is a canonical, widely used relational query language. However, there are
>>> still some scenarios where Flink SQL failed to meet user needs in terms
>> of
>>> functionality and ease of use, such as:
>>>
>>> *1. In terms of functionality*
>>>   Iteration, user-defined window, user-defined join, user-defined
>>> GroupReduce, etc. Users cannot express them with SQL;
>>>
>>> *2. In terms of ease of use*
>>>
>>>  - Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
>>>  udf2(), udf3()....)” can be used to accomplish the same function.,
>> with a
>>>  map() function returning 100 columns, one has to define or call 100
>> UDFs
>>>  when using SQL, which is quite involved.
>>>  - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be
>>>  implemented with “table.join(udtf).select()”. However, it is obvious
>> that
>>>  dataStream is easier to use than SQL.
>>>
>>> Due to the above two reasons, some users have to use the DataStream API
>> or
>>> the DataSet API. But when they do that, they lose the unification of
>> batch
>>> and streaming. They will also lose the sophisticated optimizations such
>> as
>>> codegen, aggregate join transpose and multi-stage agg from Flink SQL.
>>>
>>> We believe that enhancing the functionality and productivity is vital for
>>> the successful adoption of Table API. To this end,  Table API still
>>> requires more efforts from every contributor in the community. We see
>> great
>>> opportunity in improving our user’s experience from this work. Any
>> feedback
>>> is welcome.
>>>
>>> Regards,
>>>
>>> Jincheng
>>>
>>> jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:
>>>
>>>> Hi all,
>>>>
>>>> With the continuous efforts from the community, the Flink system has
>> been
>>>> continuously improved, which has attracted more and more users. Flink
>> SQL
>>>> is a canonical, widely used relational query language. However, there
>> are
>>>> still some scenarios where Flink SQL failed to meet user needs in terms
>> of
>>>> functionality and ease of use, such as:
>>>>
>>>>
>>>>  -
>>>>
>>>>  In terms of functionality
>>>>
>>>> Iteration, user-defined window, user-defined join, user-defined
>>>> GroupReduce, etc. Users cannot express them with SQL;
>>>>
>>>>  -
>>>>
>>>>  In terms of ease of use
>>>>  -
>>>>
>>>>     Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
>>>>     udf2(), udf3()....)” can be used to accomplish the same function.,
>> with a
>>>>     map() function returning 100 columns, one has to define or call
>> 100 UDFs
>>>>     when using SQL, which is quite involved.
>>>>     -
>>>>
>>>>     FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can
>>>>     be implemented with “table.join(udtf).select()”. However, it is
>> obvious
>>>>     that datastream is easier to use than SQL.
>>>>
>>>>
>>>> Due to the above two reasons, some users have to use the DataStream API
>> or
>>>> the DataSet API. But when they do that, they lose the unification of
>> batch
>>>> and streaming. They will also lose the sophisticated optimizations such
>> as
>>>> codegen, aggregate join transpose  and multi-stage agg from Flink SQL.
>>>>
>>>> We believe that enhancing the functionality and productivity is vital
>> for
>>>> the successful adoption of Table API. To this end,  Table API still
>>>> requires more efforts from every contributor in the community. We see
>> great
>>>> opportunity in improving our user’s experience from this work. Any
>> feedback
>>>> is welcome.
>>>>
>>>> Regards,
>>>>
>>>> Jincheng
>>>>
>>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

jincheng sun
In reply to this post by Timo Walther-2
Hi, Timo,
I am very grateful for your feedback, and I am very excited when I hear
that you also consider adding a process function to the TableAPI.

I agree that add support for the Process Function on the Table API, which
is actually part of my proposal Enhancing the functionality of Table API.
In fact, supporting the ProcessFunction means supporting the user-defined
Operator. As you said, A ProcessFunction can implement any logic, including
the user-defined window, which leaves the user with enough freedom and
control. At the same time, Co-PrecessFunction needs to be supported, so we
can implement the logic of User-Defined JOIN through Co-PrecessFunciton. Of
course, Co-PrecessFunciton also needs to introduce the concept of Connect,
and will introduce a new ConnectedTable type on TableAPI.  And I also think
TableAPI also for more event-driven applications.

About processFunction In addition to the timer function, it should be
completely equivalent to flatmapFunction, so maybe we can support map and
flatmap in Table, support processFunction in GroupedTable, because for the
reason of State, the Timer of ProcessFunction can only Apply to KeyedStream.

You are right, ANSI-SQL is difficult to express complex operator logic such
as ProcessFunction, so once we decide to make these enhancements on the
TableAPI, it means that the Flink SQL only includes ANSI-SQL operations,
and the TableAPI' operations is SQL super set. This means that the Flink
High-level API includes the A Query language SQL and A powerfu program
language Table API. In this way, SQL using for those simple ETL user
groups, the TableAPI is for a user group that needs to be customized for
complex logic, and these users can enjoy The benefit of the query
optimizer. Maybe we need more refinement and hard work to support these
functions, but maybe this is a good direction of effort.

Thanks,
Jincheng

Timo Walther <[hidden email]> 于2018年11月1日周四 下午10:08写道:

> Hi Jincheng,
>
> I was also thinking about introducing a process function for the Table
> API several times. This would allow to define more complex logic (custom
> windows, timers, etc.) embedded into a relational API with schema
> awareness and optimization around the black box. Of course this would
> mean that we diverge with Table API from SQL API, however, it would open
> the Table API also for more event-driven applications.
>
> Maybe it would be possible to define timers and firing logic using Table
> API expressions and UDFs. Within planning this would be treated as a
> special Calc node.
>
> Just some ideas that might be interesting for new use cases.
>
> Regards,
> Timo
>
>
> Am 01.11.18 um 13:12 schrieb Aljoscha Krettek:
> > Hi Jincheng,
> >
> > these points sound very good! Are there any concrete proposals for
> changes? For example a FLIP/design document?
> >
> > See here for FLIPs:
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >
> > Best,
> > Aljoscha
> >
> >> On 1. Nov 2018, at 12:51, jincheng sun <[hidden email]>
> wrote:
> >>
> >> *--------I am sorry for the formatting of the email content. I reformat
> >> the **content** as follows-----------*
> >>
> >> *Hi ALL,*
> >>
> >> With the continuous efforts from the community, the Flink system has
> been
> >> continuously improved, which has attracted more and more users. Flink
> SQL
> >> is a canonical, widely used relational query language. However, there
> are
> >> still some scenarios where Flink SQL failed to meet user needs in terms
> of
> >> functionality and ease of use, such as:
> >>
> >> *1. In terms of functionality*
> >>     Iteration, user-defined window, user-defined join, user-defined
> >> GroupReduce, etc. Users cannot express them with SQL;
> >>
> >> *2. In terms of ease of use*
> >>
> >>    - Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
> >>    udf2(), udf3()....)” can be used to accomplish the same function.,
> with a
> >>    map() function returning 100 columns, one has to define or call 100
> UDFs
> >>    when using SQL, which is quite involved.
> >>    - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can
> be
> >>    implemented with “table.join(udtf).select()”. However, it is obvious
> that
> >>    dataStream is easier to use than SQL.
> >>
> >> Due to the above two reasons, some users have to use the DataStream API
> or
> >> the DataSet API. But when they do that, they lose the unification of
> batch
> >> and streaming. They will also lose the sophisticated optimizations such
> as
> >> codegen, aggregate join transpose and multi-stage agg from Flink SQL.
> >>
> >> We believe that enhancing the functionality and productivity is vital
> for
> >> the successful adoption of Table API. To this end,  Table API still
> >> requires more efforts from every contributor in the community. We see
> great
> >> opportunity in improving our user’s experience from this work. Any
> feedback
> >> is welcome.
> >>
> >> Regards,
> >>
> >> Jincheng
> >>
> >> jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:
> >>
> >>> Hi all,
> >>>
> >>> With the continuous efforts from the community, the Flink system has
> been
> >>> continuously improved, which has attracted more and more users. Flink
> SQL
> >>> is a canonical, widely used relational query language. However, there
> are
> >>> still some scenarios where Flink SQL failed to meet user needs in
> terms of
> >>> functionality and ease of use, such as:
> >>>
> >>>
> >>>    -
> >>>
> >>>    In terms of functionality
> >>>
> >>> Iteration, user-defined window, user-defined join, user-defined
> >>> GroupReduce, etc. Users cannot express them with SQL;
> >>>
> >>>    -
> >>>
> >>>    In terms of ease of use
> >>>    -
> >>>
> >>>       Map - e.g. “dataStream.map(mapFun)”. Although
> “table.select(udf1(),
> >>>       udf2(), udf3()....)” can be used to accomplish the same
> function., with a
> >>>       map() function returning 100 columns, one has to define or call
> 100 UDFs
> >>>       when using SQL, which is quite involved.
> >>>       -
> >>>
> >>>       FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it
> can
> >>>       be implemented with “table.join(udtf).select()”. However, it is
> obvious
> >>>       that datastream is easier to use than SQL.
> >>>
> >>>
> >>> Due to the above two reasons, some users have to use the DataStream
> API or
> >>> the DataSet API. But when they do that, they lose the unification of
> batch
> >>> and streaming. They will also lose the sophisticated optimizations
> such as
> >>> codegen, aggregate join transpose  and multi-stage agg from Flink SQL.
> >>>
> >>> We believe that enhancing the functionality and productivity is vital
> for
> >>> the successful adoption of Table API. To this end,  Table API still
> >>> requires more efforts from every contributor in the community. We see
> great
> >>> opportunity in improving our user’s experience from this work. Any
> feedback
> >>> is welcome.
> >>>
> >>> Regards,
> >>>
> >>> Jincheng
> >>>
> >>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

Becket Qin
Thanks for the proposal, Jincheng.

This makes a lot of sense. As a programming interface, Table API is
especially attractive because it supports both batch and stream. However,
the relational-only API often forces users to shoehorn their logic into a
bunch of user defined functions. Introducing some more flexible API (e.g.
row-based APIs) to process records would really help here.

Besides the processing API, another useful improvement would be allowing
batch tables and stream tables to run in the same job, which is actually a
quite common scenario.

As you said, there are a lot of work could be done here. I am looking
forward to the upcoming FLIPs.

Thanks,

Jiangjie (Becket) Qin

On Fri, Nov 2, 2018 at 12:10 AM jincheng sun <[hidden email]>
wrote:

> Hi, Timo,
> I am very grateful for your feedback, and I am very excited when I hear
> that you also consider adding a process function to the TableAPI.
>
> I agree that add support for the Process Function on the Table API, which
> is actually part of my proposal Enhancing the functionality of Table API.
> In fact, supporting the ProcessFunction means supporting the user-defined
> Operator. As you said, A ProcessFunction can implement any logic, including
> the user-defined window, which leaves the user with enough freedom and
> control. At the same time, Co-PrecessFunction needs to be supported, so we
> can implement the logic of User-Defined JOIN through Co-PrecessFunciton. Of
> course, Co-PrecessFunciton also needs to introduce the concept of Connect,
> and will introduce a new ConnectedTable type on TableAPI.  And I also think
> TableAPI also for more event-driven applications.
>
> About processFunction In addition to the timer function, it should be
> completely equivalent to flatmapFunction, so maybe we can support map and
> flatmap in Table, support processFunction in GroupedTable, because for the
> reason of State, the Timer of ProcessFunction can only Apply to
> KeyedStream.
>
> You are right, ANSI-SQL is difficult to express complex operator logic such
> as ProcessFunction, so once we decide to make these enhancements on the
> TableAPI, it means that the Flink SQL only includes ANSI-SQL operations,
> and the TableAPI' operations is SQL super set. This means that the Flink
> High-level API includes the A Query language SQL and A powerfu program
> language Table API. In this way, SQL using for those simple ETL user
> groups, the TableAPI is for a user group that needs to be customized for
> complex logic, and these users can enjoy The benefit of the query
> optimizer. Maybe we need more refinement and hard work to support these
> functions, but maybe this is a good direction of effort.
>
> Thanks,
> Jincheng
>
> Timo Walther <[hidden email]> 于2018年11月1日周四 下午10:08写道:
>
> > Hi Jincheng,
> >
> > I was also thinking about introducing a process function for the Table
> > API several times. This would allow to define more complex logic (custom
> > windows, timers, etc.) embedded into a relational API with schema
> > awareness and optimization around the black box. Of course this would
> > mean that we diverge with Table API from SQL API, however, it would open
> > the Table API also for more event-driven applications.
> >
> > Maybe it would be possible to define timers and firing logic using Table
> > API expressions and UDFs. Within planning this would be treated as a
> > special Calc node.
> >
> > Just some ideas that might be interesting for new use cases.
> >
> > Regards,
> > Timo
> >
> >
> > Am 01.11.18 um 13:12 schrieb Aljoscha Krettek:
> > > Hi Jincheng,
> > >
> > > these points sound very good! Are there any concrete proposals for
> > changes? For example a FLIP/design document?
> > >
> > > See here for FLIPs:
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > >
> > > Best,
> > > Aljoscha
> > >
> > >> On 1. Nov 2018, at 12:51, jincheng sun <[hidden email]>
> > wrote:
> > >>
> > >> *--------I am sorry for the formatting of the email content. I
> reformat
> > >> the **content** as follows-----------*
> > >>
> > >> *Hi ALL,*
> > >>
> > >> With the continuous efforts from the community, the Flink system has
> > been
> > >> continuously improved, which has attracted more and more users. Flink
> > SQL
> > >> is a canonical, widely used relational query language. However, there
> > are
> > >> still some scenarios where Flink SQL failed to meet user needs in
> terms
> > of
> > >> functionality and ease of use, such as:
> > >>
> > >> *1. In terms of functionality*
> > >>     Iteration, user-defined window, user-defined join, user-defined
> > >> GroupReduce, etc. Users cannot express them with SQL;
> > >>
> > >> *2. In terms of ease of use*
> > >>
> > >>    - Map - e.g. “dataStream.map(mapFun)”. Although
> “table.select(udf1(),
> > >>    udf2(), udf3()....)” can be used to accomplish the same function.,
> > with a
> > >>    map() function returning 100 columns, one has to define or call 100
> > UDFs
> > >>    when using SQL, which is quite involved.
> > >>    - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it
> can
> > be
> > >>    implemented with “table.join(udtf).select()”. However, it is
> obvious
> > that
> > >>    dataStream is easier to use than SQL.
> > >>
> > >> Due to the above two reasons, some users have to use the DataStream
> API
> > or
> > >> the DataSet API. But when they do that, they lose the unification of
> > batch
> > >> and streaming. They will also lose the sophisticated optimizations
> such
> > as
> > >> codegen, aggregate join transpose and multi-stage agg from Flink SQL.
> > >>
> > >> We believe that enhancing the functionality and productivity is vital
> > for
> > >> the successful adoption of Table API. To this end,  Table API still
> > >> requires more efforts from every contributor in the community. We see
> > great
> > >> opportunity in improving our user’s experience from this work. Any
> > feedback
> > >> is welcome.
> > >>
> > >> Regards,
> > >>
> > >> Jincheng
> > >>
> > >> jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:
> > >>
> > >>> Hi all,
> > >>>
> > >>> With the continuous efforts from the community, the Flink system has
> > been
> > >>> continuously improved, which has attracted more and more users. Flink
> > SQL
> > >>> is a canonical, widely used relational query language. However, there
> > are
> > >>> still some scenarios where Flink SQL failed to meet user needs in
> > terms of
> > >>> functionality and ease of use, such as:
> > >>>
> > >>>
> > >>>    -
> > >>>
> > >>>    In terms of functionality
> > >>>
> > >>> Iteration, user-defined window, user-defined join, user-defined
> > >>> GroupReduce, etc. Users cannot express them with SQL;
> > >>>
> > >>>    -
> > >>>
> > >>>    In terms of ease of use
> > >>>    -
> > >>>
> > >>>       Map - e.g. “dataStream.map(mapFun)”. Although
> > “table.select(udf1(),
> > >>>       udf2(), udf3()....)” can be used to accomplish the same
> > function., with a
> > >>>       map() function returning 100 columns, one has to define or call
> > 100 UDFs
> > >>>       when using SQL, which is quite involved.
> > >>>       -
> > >>>
> > >>>       FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it
> > can
> > >>>       be implemented with “table.join(udtf).select()”. However, it is
> > obvious
> > >>>       that datastream is easier to use than SQL.
> > >>>
> > >>>
> > >>> Due to the above two reasons, some users have to use the DataStream
> > API or
> > >>> the DataSet API. But when they do that, they lose the unification of
> > batch
> > >>> and streaming. They will also lose the sophisticated optimizations
> > such as
> > >>> codegen, aggregate join transpose  and multi-stage agg from Flink
> SQL.
> > >>>
> > >>> We believe that enhancing the functionality and productivity is vital
> > for
> > >>> the successful adoption of Table API. To this end,  Table API still
> > >>> requires more efforts from every contributor in the community. We see
> > great
> > >>> opportunity in improving our user’s experience from this work. Any
> > feedback
> > >>> is welcome.
> > >>>
> > >>> Regards,
> > >>>
> > >>> Jincheng
> > >>>
> > >>>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

Shaoxuan Wang
In reply to this post by Aljoscha Krettek-2
Hi Aljoscha,
Glad that you like the proposal. We have completed the prototype of most
new proposed functionalities. Once collect the feedback from community, we
will come up with a concrete FLIP/design doc.

Regards,
Shaoxuan


On Thu, Nov 1, 2018 at 8:12 PM Aljoscha Krettek <[hidden email]> wrote:

> Hi Jincheng,
>
> these points sound very good! Are there any concrete proposals for
> changes? For example a FLIP/design document?
>
> See here for FLIPs:
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
> Best,
> Aljoscha
>
> > On 1. Nov 2018, at 12:51, jincheng sun <[hidden email]> wrote:
> >
> > *--------I am sorry for the formatting of the email content. I reformat
> > the **content** as follows-----------*
> >
> > *Hi ALL,*
> >
> > With the continuous efforts from the community, the Flink system has been
> > continuously improved, which has attracted more and more users. Flink SQL
> > is a canonical, widely used relational query language. However, there are
> > still some scenarios where Flink SQL failed to meet user needs in terms
> of
> > functionality and ease of use, such as:
> >
> > *1. In terms of functionality*
> >    Iteration, user-defined window, user-defined join, user-defined
> > GroupReduce, etc. Users cannot express them with SQL;
> >
> > *2. In terms of ease of use*
> >
> >   - Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
> >   udf2(), udf3()....)” can be used to accomplish the same function.,
> with a
> >   map() function returning 100 columns, one has to define or call 100
> UDFs
> >   when using SQL, which is quite involved.
> >   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be
> >   implemented with “table.join(udtf).select()”. However, it is obvious
> that
> >   dataStream is easier to use than SQL.
> >
> > Due to the above two reasons, some users have to use the DataStream API
> or
> > the DataSet API. But when they do that, they lose the unification of
> batch
> > and streaming. They will also lose the sophisticated optimizations such
> as
> > codegen, aggregate join transpose and multi-stage agg from Flink SQL.
> >
> > We believe that enhancing the functionality and productivity is vital for
> > the successful adoption of Table API. To this end,  Table API still
> > requires more efforts from every contributor in the community. We see
> great
> > opportunity in improving our user’s experience from this work. Any
> feedback
> > is welcome.
> >
> > Regards,
> >
> > Jincheng
> >
> > jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:
> >
> >> Hi all,
> >>
> >> With the continuous efforts from the community, the Flink system has
> been
> >> continuously improved, which has attracted more and more users. Flink
> SQL
> >> is a canonical, widely used relational query language. However, there
> are
> >> still some scenarios where Flink SQL failed to meet user needs in terms
> of
> >> functionality and ease of use, such as:
> >>
> >>
> >>   -
> >>
> >>   In terms of functionality
> >>
> >> Iteration, user-defined window, user-defined join, user-defined
> >> GroupReduce, etc. Users cannot express them with SQL;
> >>
> >>   -
> >>
> >>   In terms of ease of use
> >>   -
> >>
> >>      Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
> >>      udf2(), udf3()....)” can be used to accomplish the same function.,
> with a
> >>      map() function returning 100 columns, one has to define or call
> 100 UDFs
> >>      when using SQL, which is quite involved.
> >>      -
> >>
> >>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can
> >>      be implemented with “table.join(udtf).select()”. However, it is
> obvious
> >>      that datastream is easier to use than SQL.
> >>
> >>
> >> Due to the above two reasons, some users have to use the DataStream API
> or
> >> the DataSet API. But when they do that, they lose the unification of
> batch
> >> and streaming. They will also lose the sophisticated optimizations such
> as
> >> codegen, aggregate join transpose  and multi-stage agg from Flink SQL.
> >>
> >> We believe that enhancing the functionality and productivity is vital
> for
> >> the successful adoption of Table API. To this end,  Table API still
> >> requires more efforts from every contributor in the community. We see
> great
> >> opportunity in improving our user’s experience from this work. Any
> feedback
> >> is welcome.
> >>
> >> Regards,
> >>
> >> Jincheng
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

Hequn Cheng
Hi Jincheng,

Thanks a lot for your proposal. It is very encouraging!

As we all know, SQL is a widely used language. It follows standards, is a
descriptive language, and is easy to use. A powerful feature of SQL is that
it supports optimization. Users only need to care about the logic of the
program. The underlying optimizer will help users optimize the performance
of the program. However, in terms of functionality and ease of use, in some
scenarios sql will be limited, as described in Jincheng's proposal.

Correspondingly, the DataStream/DataSet api can provide powerful
functionalities. Users can write ProcessFunction/CoProcessFunction and get
the timer. Compared with SQL, it provides more functionalities and
flexibilities. However, it does not support optimization like SQL.
Meanwhile, DataStream/DataSet api has not been unified which means, for the
same logic, users need to write a job for each stream and batch.

With TableApi, I think we can combine the advantages of both. Users can
easily write relational operations and enjoy optimization. At the same
time, it supports more functionality and ease of use. Looking forward to
the detailed design/FLIP.

Best,
Hequn

On Fri, Nov 2, 2018 at 9:48 AM Shaoxuan Wang <[hidden email]> wrote:

> Hi Aljoscha,
> Glad that you like the proposal. We have completed the prototype of most
> new proposed functionalities. Once collect the feedback from community, we
> will come up with a concrete FLIP/design doc.
>
> Regards,
> Shaoxuan
>
>
> On Thu, Nov 1, 2018 at 8:12 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Hi Jincheng,
> >
> > these points sound very good! Are there any concrete proposals for
> > changes? For example a FLIP/design document?
> >
> > See here for FLIPs:
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >
> > Best,
> > Aljoscha
> >
> > > On 1. Nov 2018, at 12:51, jincheng sun <[hidden email]>
> wrote:
> > >
> > > *--------I am sorry for the formatting of the email content. I reformat
> > > the **content** as follows-----------*
> > >
> > > *Hi ALL,*
> > >
> > > With the continuous efforts from the community, the Flink system has
> been
> > > continuously improved, which has attracted more and more users. Flink
> SQL
> > > is a canonical, widely used relational query language. However, there
> are
> > > still some scenarios where Flink SQL failed to meet user needs in terms
> > of
> > > functionality and ease of use, such as:
> > >
> > > *1. In terms of functionality*
> > >    Iteration, user-defined window, user-defined join, user-defined
> > > GroupReduce, etc. Users cannot express them with SQL;
> > >
> > > *2. In terms of ease of use*
> > >
> > >   - Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(),
> > >   udf2(), udf3()....)” can be used to accomplish the same function.,
> > with a
> > >   map() function returning 100 columns, one has to define or call 100
> > UDFs
> > >   when using SQL, which is quite involved.
> > >   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can
> be
> > >   implemented with “table.join(udtf).select()”. However, it is obvious
> > that
> > >   dataStream is easier to use than SQL.
> > >
> > > Due to the above two reasons, some users have to use the DataStream API
> > or
> > > the DataSet API. But when they do that, they lose the unification of
> > batch
> > > and streaming. They will also lose the sophisticated optimizations such
> > as
> > > codegen, aggregate join transpose and multi-stage agg from Flink SQL.
> > >
> > > We believe that enhancing the functionality and productivity is vital
> for
> > > the successful adoption of Table API. To this end,  Table API still
> > > requires more efforts from every contributor in the community. We see
> > great
> > > opportunity in improving our user’s experience from this work. Any
> > feedback
> > > is welcome.
> > >
> > > Regards,
> > >
> > > Jincheng
> > >
> > > jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:
> > >
> > >> Hi all,
> > >>
> > >> With the continuous efforts from the community, the Flink system has
> > been
> > >> continuously improved, which has attracted more and more users. Flink
> > SQL
> > >> is a canonical, widely used relational query language. However, there
> > are
> > >> still some scenarios where Flink SQL failed to meet user needs in
> terms
> > of
> > >> functionality and ease of use, such as:
> > >>
> > >>
> > >>   -
> > >>
> > >>   In terms of functionality
> > >>
> > >> Iteration, user-defined window, user-defined join, user-defined
> > >> GroupReduce, etc. Users cannot express them with SQL;
> > >>
> > >>   -
> > >>
> > >>   In terms of ease of use
> > >>   -
> > >>
> > >>      Map - e.g. “dataStream.map(mapFun)”. Although
> “table.select(udf1(),
> > >>      udf2(), udf3()....)” can be used to accomplish the same
> function.,
> > with a
> > >>      map() function returning 100 columns, one has to define or call
> > 100 UDFs
> > >>      when using SQL, which is quite involved.
> > >>      -
> > >>
> > >>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it
> can
> > >>      be implemented with “table.join(udtf).select()”. However, it is
> > obvious
> > >>      that datastream is easier to use than SQL.
> > >>
> > >>
> > >> Due to the above two reasons, some users have to use the DataStream
> API
> > or
> > >> the DataSet API. But when they do that, they lose the unification of
> > batch
> > >> and streaming. They will also lose the sophisticated optimizations
> such
> > as
> > >> codegen, aggregate join transpose  and multi-stage agg from Flink SQL.
> > >>
> > >> We believe that enhancing the functionality and productivity is vital
> > for
> > >> the successful adoption of Table API. To this end,  Table API still
> > >> requires more efforts from every contributor in the community. We see
> > great
> > >> opportunity in improving our user’s experience from this work. Any
> > feedback
> > >> is welcome.
> > >>
> > >> Regards,
> > >>
> > >> Jincheng
> > >>
> > >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

jincheng sun
In reply to this post by Becket Qin
Hi, Jiangjie,
Thanks a lot for your feedback. And also thanks for our offline discussion!
Yes, your right! The Row-based APIs which you mentioned are very friendly
to flink user!
In order to follow the concept of the traditional database, perhaps we
named the corresponding function RowValued/TabeValued function will be more
appropriate, then from the perspective of return value in TableAPI we have
three type functions:

   - ColumnValuedFunction - ScalarFunction & AggregateFunction, and the
   result is a column.
   - RowValuedFunction - MapFunction which I'll proposal is
   RowValuedFunciton, and result is a single row.
   - TableValuedFunction - FlatMapFunction which I'll proposal is
   TableValuedFunciton, and result is a table.

The detail will be described in following FLIP/design doc.
About the input type I think we can support both column parameters and row
parameters. but I think the meaning you want to express should be
consistent with me, we are on the same page, right?

And thanks you like the proposal, I hope that we can work together to
advance the work!

Best,
Jincheng

Becket Qin <[hidden email]> 于2018年11月2日周五 上午1:25写道:

> Thanks for the proposal, Jincheng.
>
> This makes a lot of sense. As a programming interface, Table API is
> especially attractive because it supports both batch and stream. However,
> the relational-only API often forces users to shoehorn their logic into a
> bunch of user defined functions. Introducing some more flexible API (e.g.
> row-based APIs) to process records would really help here.
>
> Besides the processing API, another useful improvement would be allowing
> batch tables and stream tables to run in the same job, which is actually a
> quite common scenario.
>
> As you said, there are a lot of work could be done here. I am looking
> forward to the upcoming FLIPs.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 2, 2018 at 12:10 AM jincheng sun <[hidden email]>
> wrote:
>
> > Hi, Timo,
> > I am very grateful for your feedback, and I am very excited when I hear
> > that you also consider adding a process function to the TableAPI.
> >
> > I agree that add support for the Process Function on the Table API, which
> > is actually part of my proposal Enhancing the functionality of Table API.
> > In fact, supporting the ProcessFunction means supporting the user-defined
> > Operator. As you said, A ProcessFunction can implement any logic,
> including
> > the user-defined window, which leaves the user with enough freedom and
> > control. At the same time, Co-PrecessFunction needs to be supported, so
> we
> > can implement the logic of User-Defined JOIN through Co-PrecessFunciton.
> Of
> > course, Co-PrecessFunciton also needs to introduce the concept of
> Connect,
> > and will introduce a new ConnectedTable type on TableAPI.  And I also
> think
> > TableAPI also for more event-driven applications.
> >
> > About processFunction In addition to the timer function, it should be
> > completely equivalent to flatmapFunction, so maybe we can support map and
> > flatmap in Table, support processFunction in GroupedTable, because for
> the
> > reason of State, the Timer of ProcessFunction can only Apply to
> > KeyedStream.
> >
> > You are right, ANSI-SQL is difficult to express complex operator logic
> such
> > as ProcessFunction, so once we decide to make these enhancements on the
> > TableAPI, it means that the Flink SQL only includes ANSI-SQL operations,
> > and the TableAPI' operations is SQL super set. This means that the Flink
> > High-level API includes the A Query language SQL and A powerfu program
> > language Table API. In this way, SQL using for those simple ETL user
> > groups, the TableAPI is for a user group that needs to be customized for
> > complex logic, and these users can enjoy The benefit of the query
> > optimizer. Maybe we need more refinement and hard work to support these
> > functions, but maybe this is a good direction of effort.
> >
> > Thanks,
> > Jincheng
> >
> > Timo Walther <[hidden email]> 于2018年11月1日周四 下午10:08写道:
> >
> > > Hi Jincheng,
> > >
> > > I was also thinking about introducing a process function for the Table
> > > API several times. This would allow to define more complex logic
> (custom
> > > windows, timers, etc.) embedded into a relational API with schema
> > > awareness and optimization around the black box. Of course this would
> > > mean that we diverge with Table API from SQL API, however, it would
> open
> > > the Table API also for more event-driven applications.
> > >
> > > Maybe it would be possible to define timers and firing logic using
> Table
> > > API expressions and UDFs. Within planning this would be treated as a
> > > special Calc node.
> > >
> > > Just some ideas that might be interesting for new use cases.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > Am 01.11.18 um 13:12 schrieb Aljoscha Krettek:
> > > > Hi Jincheng,
> > > >
> > > > these points sound very good! Are there any concrete proposals for
> > > changes? For example a FLIP/design document?
> > > >
> > > > See here for FLIPs:
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > >> On 1. Nov 2018, at 12:51, jincheng sun <[hidden email]>
> > > wrote:
> > > >>
> > > >> *--------I am sorry for the formatting of the email content. I
> > reformat
> > > >> the **content** as follows-----------*
> > > >>
> > > >> *Hi ALL,*
> > > >>
> > > >> With the continuous efforts from the community, the Flink system has
> > > been
> > > >> continuously improved, which has attracted more and more users.
> Flink
> > > SQL
> > > >> is a canonical, widely used relational query language. However,
> there
> > > are
> > > >> still some scenarios where Flink SQL failed to meet user needs in
> > terms
> > > of
> > > >> functionality and ease of use, such as:
> > > >>
> > > >> *1. In terms of functionality*
> > > >>     Iteration, user-defined window, user-defined join, user-defined
> > > >> GroupReduce, etc. Users cannot express them with SQL;
> > > >>
> > > >> *2. In terms of ease of use*
> > > >>
> > > >>    - Map - e.g. “dataStream.map(mapFun)”. Although
> > “table.select(udf1(),
> > > >>    udf2(), udf3()....)” can be used to accomplish the same
> function.,
> > > with a
> > > >>    map() function returning 100 columns, one has to define or call
> 100
> > > UDFs
> > > >>    when using SQL, which is quite involved.
> > > >>    - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it
> > can
> > > be
> > > >>    implemented with “table.join(udtf).select()”. However, it is
> > obvious
> > > that
> > > >>    dataStream is easier to use than SQL.
> > > >>
> > > >> Due to the above two reasons, some users have to use the DataStream
> > API
> > > or
> > > >> the DataSet API. But when they do that, they lose the unification of
> > > batch
> > > >> and streaming. They will also lose the sophisticated optimizations
> > such
> > > as
> > > >> codegen, aggregate join transpose and multi-stage agg from Flink
> SQL.
> > > >>
> > > >> We believe that enhancing the functionality and productivity is
> vital
> > > for
> > > >> the successful adoption of Table API. To this end,  Table API still
> > > >> requires more efforts from every contributor in the community. We
> see
> > > great
> > > >> opportunity in improving our user’s experience from this work. Any
> > > feedback
> > > >> is welcome.
> > > >>
> > > >> Regards,
> > > >>
> > > >> Jincheng
> > > >>
> > > >> jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:
> > > >>
> > > >>> Hi all,
> > > >>>
> > > >>> With the continuous efforts from the community, the Flink system
> has
> > > been
> > > >>> continuously improved, which has attracted more and more users.
> Flink
> > > SQL
> > > >>> is a canonical, widely used relational query language. However,
> there
> > > are
> > > >>> still some scenarios where Flink SQL failed to meet user needs in
> > > terms of
> > > >>> functionality and ease of use, such as:
> > > >>>
> > > >>>
> > > >>>    -
> > > >>>
> > > >>>    In terms of functionality
> > > >>>
> > > >>> Iteration, user-defined window, user-defined join, user-defined
> > > >>> GroupReduce, etc. Users cannot express them with SQL;
> > > >>>
> > > >>>    -
> > > >>>
> > > >>>    In terms of ease of use
> > > >>>    -
> > > >>>
> > > >>>       Map - e.g. “dataStream.map(mapFun)”. Although
> > > “table.select(udf1(),
> > > >>>       udf2(), udf3()....)” can be used to accomplish the same
> > > function., with a
> > > >>>       map() function returning 100 columns, one has to define or
> call
> > > 100 UDFs
> > > >>>       when using SQL, which is quite involved.
> > > >>>       -
> > > >>>
> > > >>>       FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly,
> it
> > > can
> > > >>>       be implemented with “table.join(udtf).select()”. However, it
> is
> > > obvious
> > > >>>       that datastream is easier to use than SQL.
> > > >>>
> > > >>>
> > > >>> Due to the above two reasons, some users have to use the DataStream
> > > API or
> > > >>> the DataSet API. But when they do that, they lose the unification
> of
> > > batch
> > > >>> and streaming. They will also lose the sophisticated optimizations
> > > such as
> > > >>> codegen, aggregate join transpose  and multi-stage agg from Flink
> > SQL.
> > > >>>
> > > >>> We believe that enhancing the functionality and productivity is
> vital
> > > for
> > > >>> the successful adoption of Table API. To this end,  Table API still
> > > >>> requires more efforts from every contributor in the community. We
> see
> > > great
> > > >>> opportunity in improving our user’s experience from this work. Any
> > > feedback
> > > >>> is welcome.
> > > >>>
> > > >>> Regards,
> > > >>>
> > > >>> Jincheng
> > > >>>
> > > >>>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

jincheng sun
In reply to this post by Hequn Cheng
Hi Hequn,
Thanks for your feedback! And also thanks for our offline discussion!
You are right, unification of batch and streaming is very important for
flink API.
We will provide more detailed design later, Please let me know if you have
further thoughts or feedback.

Thanks,
Jincheng

Hequn Cheng <[hidden email]> 于2018年11月2日周五 上午10:02写道:

> Hi Jincheng,
>
> Thanks a lot for your proposal. It is very encouraging!
>
> As we all know, SQL is a widely used language. It follows standards, is a
> descriptive language, and is easy to use. A powerful feature of SQL is that
> it supports optimization. Users only need to care about the logic of the
> program. The underlying optimizer will help users optimize the performance
> of the program. However, in terms of functionality and ease of use, in some
> scenarios sql will be limited, as described in Jincheng's proposal.
>
> Correspondingly, the DataStream/DataSet api can provide powerful
> functionalities. Users can write ProcessFunction/CoProcessFunction and get
> the timer. Compared with SQL, it provides more functionalities and
> flexibilities. However, it does not support optimization like SQL.
> Meanwhile, DataStream/DataSet api has not been unified which means, for the
> same logic, users need to write a job for each stream and batch.
>
> With TableApi, I think we can combine the advantages of both. Users can
> easily write relational operations and enjoy optimization. At the same
> time, it supports more functionality and ease of use. Looking forward to
> the detailed design/FLIP.
>
> Best,
> Hequn
>
> On Fri, Nov 2, 2018 at 9:48 AM Shaoxuan Wang <[hidden email]> wrote:
>
> > Hi Aljoscha,
> > Glad that you like the proposal. We have completed the prototype of most
> > new proposed functionalities. Once collect the feedback from community,
> we
> > will come up with a concrete FLIP/design doc.
> >
> > Regards,
> > Shaoxuan
> >
> >
> > On Thu, Nov 1, 2018 at 8:12 PM Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > Hi Jincheng,
> > >
> > > these points sound very good! Are there any concrete proposals for
> > > changes? For example a FLIP/design document?
> > >
> > > See here for FLIPs:
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 1. Nov 2018, at 12:51, jincheng sun <[hidden email]>
> > wrote:
> > > >
> > > > *--------I am sorry for the formatting of the email content. I
> reformat
> > > > the **content** as follows-----------*
> > > >
> > > > *Hi ALL,*
> > > >
> > > > With the continuous efforts from the community, the Flink system has
> > been
> > > > continuously improved, which has attracted more and more users. Flink
> > SQL
> > > > is a canonical, widely used relational query language. However, there
> > are
> > > > still some scenarios where Flink SQL failed to meet user needs in
> terms
> > > of
> > > > functionality and ease of use, such as:
> > > >
> > > > *1. In terms of functionality*
> > > >    Iteration, user-defined window, user-defined join, user-defined
> > > > GroupReduce, etc. Users cannot express them with SQL;
> > > >
> > > > *2. In terms of ease of use*
> > > >
> > > >   - Map - e.g. “dataStream.map(mapFun)”. Although
> “table.select(udf1(),
> > > >   udf2(), udf3()....)” can be used to accomplish the same function.,
> > > with a
> > > >   map() function returning 100 columns, one has to define or call 100
> > > UDFs
> > > >   when using SQL, which is quite involved.
> > > >   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it
> can
> > be
> > > >   implemented with “table.join(udtf).select()”. However, it is
> obvious
> > > that
> > > >   dataStream is easier to use than SQL.
> > > >
> > > > Due to the above two reasons, some users have to use the DataStream
> API
> > > or
> > > > the DataSet API. But when they do that, they lose the unification of
> > > batch
> > > > and streaming. They will also lose the sophisticated optimizations
> such
> > > as
> > > > codegen, aggregate join transpose and multi-stage agg from Flink SQL.
> > > >
> > > > We believe that enhancing the functionality and productivity is vital
> > for
> > > > the successful adoption of Table API. To this end,  Table API still
> > > > requires more efforts from every contributor in the community. We see
> > > great
> > > > opportunity in improving our user’s experience from this work. Any
> > > feedback
> > > > is welcome.
> > > >
> > > > Regards,
> > > >
> > > > Jincheng
> > > >
> > > > jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:
> > > >
> > > >> Hi all,
> > > >>
> > > >> With the continuous efforts from the community, the Flink system has
> > > been
> > > >> continuously improved, which has attracted more and more users.
> Flink
> > > SQL
> > > >> is a canonical, widely used relational query language. However,
> there
> > > are
> > > >> still some scenarios where Flink SQL failed to meet user needs in
> > terms
> > > of
> > > >> functionality and ease of use, such as:
> > > >>
> > > >>
> > > >>   -
> > > >>
> > > >>   In terms of functionality
> > > >>
> > > >> Iteration, user-defined window, user-defined join, user-defined
> > > >> GroupReduce, etc. Users cannot express them with SQL;
> > > >>
> > > >>   -
> > > >>
> > > >>   In terms of ease of use
> > > >>   -
> > > >>
> > > >>      Map - e.g. “dataStream.map(mapFun)”. Although
> > “table.select(udf1(),
> > > >>      udf2(), udf3()....)” can be used to accomplish the same
> > function.,
> > > with a
> > > >>      map() function returning 100 columns, one has to define or call
> > > 100 UDFs
> > > >>      when using SQL, which is quite involved.
> > > >>      -
> > > >>
> > > >>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it
> > can
> > > >>      be implemented with “table.join(udtf).select()”. However, it is
> > > obvious
> > > >>      that datastream is easier to use than SQL.
> > > >>
> > > >>
> > > >> Due to the above two reasons, some users have to use the DataStream
> > API
> > > or
> > > >> the DataSet API. But when they do that, they lose the unification of
> > > batch
> > > >> and streaming. They will also lose the sophisticated optimizations
> > such
> > > as
> > > >> codegen, aggregate join transpose  and multi-stage agg from Flink
> SQL.
> > > >>
> > > >> We believe that enhancing the functionality and productivity is
> vital
> > > for
> > > >> the successful adoption of Table API. To this end,  Table API still
> > > >> requires more efforts from every contributor in the community. We
> see
> > > great
> > > >> opportunity in improving our user’s experience from this work. Any
> > > feedback
> > > >> is welcome.
> > > >>
> > > >> Regards,
> > > >>
> > > >> Jincheng
> > > >>
> > > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

tison
Hi jingchengm

Thanks a lot for your proposal! I find it is a good start point for
internal optimization works and help Flink to be more
user-friendly.

AFAIK, DataStream is the most popular API currently that Flink
users should describe their logic with detailed logic.
From a more internal view the conversion from DataStream to
JobGraph is quite mechanically and hard to be optimized. So when
users program with DataStream, they have to learn more internals
and spend a lot of time to tune for performance.
With your proposal, we provide enhanced functionality of Table API,
so that users can describe their job easily on Table aspect. This gives
an opportunity to Flink developers to introduce an optimize phase
while transforming user program(described by Table API) to internal
representation.

Given a user who want to start using Flink with simple ETL, pipelining
or analytics, he would find it is most naturally described by SQL/Table
API. Further, as mentioned by @hequn,

SQL is a widely used language. It follows standards, is a
> descriptive language, and is easy to use


thus we could expect with the enhancement of SQL/Table API, Flink
becomes more friendly to users.

Looking forward to the design doc/FLIP!

Best,
tison.


jincheng sun <[hidden email]> 于2018年11月2日周五 上午11:46写道:

> Hi Hequn,
> Thanks for your feedback! And also thanks for our offline discussion!
> You are right, unification of batch and streaming is very important for
> flink API.
> We will provide more detailed design later, Please let me know if you have
> further thoughts or feedback.
>
> Thanks,
> Jincheng
>
> Hequn Cheng <[hidden email]> 于2018年11月2日周五 上午10:02写道:
>
> > Hi Jincheng,
> >
> > Thanks a lot for your proposal. It is very encouraging!
> >
> > As we all know, SQL is a widely used language. It follows standards, is a
> > descriptive language, and is easy to use. A powerful feature of SQL is
> that
> > it supports optimization. Users only need to care about the logic of the
> > program. The underlying optimizer will help users optimize the
> performance
> > of the program. However, in terms of functionality and ease of use, in
> some
> > scenarios sql will be limited, as described in Jincheng's proposal.
> >
> > Correspondingly, the DataStream/DataSet api can provide powerful
> > functionalities. Users can write ProcessFunction/CoProcessFunction and
> get
> > the timer. Compared with SQL, it provides more functionalities and
> > flexibilities. However, it does not support optimization like SQL.
> > Meanwhile, DataStream/DataSet api has not been unified which means, for
> the
> > same logic, users need to write a job for each stream and batch.
> >
> > With TableApi, I think we can combine the advantages of both. Users can
> > easily write relational operations and enjoy optimization. At the same
> > time, it supports more functionality and ease of use. Looking forward to
> > the detailed design/FLIP.
> >
> > Best,
> > Hequn
> >
> > On Fri, Nov 2, 2018 at 9:48 AM Shaoxuan Wang <[hidden email]>
> wrote:
> >
> > > Hi Aljoscha,
> > > Glad that you like the proposal. We have completed the prototype of
> most
> > > new proposed functionalities. Once collect the feedback from community,
> > we
> > > will come up with a concrete FLIP/design doc.
> > >
> > > Regards,
> > > Shaoxuan
> > >
> > >
> > > On Thu, Nov 1, 2018 at 8:12 PM Aljoscha Krettek <[hidden email]>
> > > wrote:
> > >
> > > > Hi Jincheng,
> > > >
> > > > these points sound very good! Are there any concrete proposals for
> > > > changes? For example a FLIP/design document?
> > > >
> > > > See here for FLIPs:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > > On 1. Nov 2018, at 12:51, jincheng sun <[hidden email]>
> > > wrote:
> > > > >
> > > > > *--------I am sorry for the formatting of the email content. I
> > reformat
> > > > > the **content** as follows-----------*
> > > > >
> > > > > *Hi ALL,*
> > > > >
> > > > > With the continuous efforts from the community, the Flink system
> has
> > > been
> > > > > continuously improved, which has attracted more and more users.
> Flink
> > > SQL
> > > > > is a canonical, widely used relational query language. However,
> there
> > > are
> > > > > still some scenarios where Flink SQL failed to meet user needs in
> > terms
> > > > of
> > > > > functionality and ease of use, such as:
> > > > >
> > > > > *1. In terms of functionality*
> > > > >    Iteration, user-defined window, user-defined join, user-defined
> > > > > GroupReduce, etc. Users cannot express them with SQL;
> > > > >
> > > > > *2. In terms of ease of use*
> > > > >
> > > > >   - Map - e.g. “dataStream.map(mapFun)”. Although
> > “table.select(udf1(),
> > > > >   udf2(), udf3()....)” can be used to accomplish the same
> function.,
> > > > with a
> > > > >   map() function returning 100 columns, one has to define or call
> 100
> > > > UDFs
> > > > >   when using SQL, which is quite involved.
> > > > >   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it
> > can
> > > be
> > > > >   implemented with “table.join(udtf).select()”. However, it is
> > obvious
> > > > that
> > > > >   dataStream is easier to use than SQL.
> > > > >
> > > > > Due to the above two reasons, some users have to use the DataStream
> > API
> > > > or
> > > > > the DataSet API. But when they do that, they lose the unification
> of
> > > > batch
> > > > > and streaming. They will also lose the sophisticated optimizations
> > such
> > > > as
> > > > > codegen, aggregate join transpose and multi-stage agg from Flink
> SQL.
> > > > >
> > > > > We believe that enhancing the functionality and productivity is
> vital
> > > for
> > > > > the successful adoption of Table API. To this end,  Table API still
> > > > > requires more efforts from every contributor in the community. We
> see
> > > > great
> > > > > opportunity in improving our user’s experience from this work. Any
> > > > feedback
> > > > > is welcome.
> > > > >
> > > > > Regards,
> > > > >
> > > > > Jincheng
> > > > >
> > > > > jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> With the continuous efforts from the community, the Flink system
> has
> > > > been
> > > > >> continuously improved, which has attracted more and more users.
> > Flink
> > > > SQL
> > > > >> is a canonical, widely used relational query language. However,
> > there
> > > > are
> > > > >> still some scenarios where Flink SQL failed to meet user needs in
> > > terms
> > > > of
> > > > >> functionality and ease of use, such as:
> > > > >>
> > > > >>
> > > > >>   -
> > > > >>
> > > > >>   In terms of functionality
> > > > >>
> > > > >> Iteration, user-defined window, user-defined join, user-defined
> > > > >> GroupReduce, etc. Users cannot express them with SQL;
> > > > >>
> > > > >>   -
> > > > >>
> > > > >>   In terms of ease of use
> > > > >>   -
> > > > >>
> > > > >>      Map - e.g. “dataStream.map(mapFun)”. Although
> > > “table.select(udf1(),
> > > > >>      udf2(), udf3()....)” can be used to accomplish the same
> > > function.,
> > > > with a
> > > > >>      map() function returning 100 columns, one has to define or
> call
> > > > 100 UDFs
> > > > >>      when using SQL, which is quite involved.
> > > > >>      -
> > > > >>
> > > > >>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly,
> it
> > > can
> > > > >>      be implemented with “table.join(udtf).select()”. However, it
> is
> > > > obvious
> > > > >>      that datastream is easier to use than SQL.
> > > > >>
> > > > >>
> > > > >> Due to the above two reasons, some users have to use the
> DataStream
> > > API
> > > > or
> > > > >> the DataSet API. But when they do that, they lose the unification
> of
> > > > batch
> > > > >> and streaming. They will also lose the sophisticated optimizations
> > > such
> > > > as
> > > > >> codegen, aggregate join transpose  and multi-stage agg from Flink
> > SQL.
> > > > >>
> > > > >> We believe that enhancing the functionality and productivity is
> > vital
> > > > for
> > > > >> the successful adoption of Table API. To this end,  Table API
> still
> > > > >> requires more efforts from every contributor in the community. We
> > see
> > > > great
> > > > >> opportunity in improving our user’s experience from this work. Any
> > > > feedback
> > > > >> is welcome.
> > > > >>
> > > > >> Regards,
> > > > >>
> > > > >> Jincheng
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

jincheng sun
Hi tison,

Thanks a lot for your feedback!
I am very happy to see that community contributors agree to enhanced the
TableAPI. This work is a long-term continuous work, we will push it in
stages, we will soon complete  the enhanced list of the first phase, we can
go deep discussion  in google doc. thanks again for joining on the very
important discussion of the Flink Table API.

Thanks,
Jincheng

Tzu-Li Chen <[hidden email]> 于2018年11月2日周五 下午1:49写道:

> Hi jingchengm
>
> Thanks a lot for your proposal! I find it is a good start point for
> internal optimization works and help Flink to be more
> user-friendly.
>
> AFAIK, DataStream is the most popular API currently that Flink
> users should describe their logic with detailed logic.
> From a more internal view the conversion from DataStream to
> JobGraph is quite mechanically and hard to be optimized. So when
> users program with DataStream, they have to learn more internals
> and spend a lot of time to tune for performance.
> With your proposal, we provide enhanced functionality of Table API,
> so that users can describe their job easily on Table aspect. This gives
> an opportunity to Flink developers to introduce an optimize phase
> while transforming user program(described by Table API) to internal
> representation.
>
> Given a user who want to start using Flink with simple ETL, pipelining
> or analytics, he would find it is most naturally described by SQL/Table
> API. Further, as mentioned by @hequn,
>
> SQL is a widely used language. It follows standards, is a
> > descriptive language, and is easy to use
>
>
> thus we could expect with the enhancement of SQL/Table API, Flink
> becomes more friendly to users.
>
> Looking forward to the design doc/FLIP!
>
> Best,
> tison.
>
>
> jincheng sun <[hidden email]> 于2018年11月2日周五 上午11:46写道:
>
> > Hi Hequn,
> > Thanks for your feedback! And also thanks for our offline discussion!
> > You are right, unification of batch and streaming is very important for
> > flink API.
> > We will provide more detailed design later, Please let me know if you
> have
> > further thoughts or feedback.
> >
> > Thanks,
> > Jincheng
> >
> > Hequn Cheng <[hidden email]> 于2018年11月2日周五 上午10:02写道:
> >
> > > Hi Jincheng,
> > >
> > > Thanks a lot for your proposal. It is very encouraging!
> > >
> > > As we all know, SQL is a widely used language. It follows standards,
> is a
> > > descriptive language, and is easy to use. A powerful feature of SQL is
> > that
> > > it supports optimization. Users only need to care about the logic of
> the
> > > program. The underlying optimizer will help users optimize the
> > performance
> > > of the program. However, in terms of functionality and ease of use, in
> > some
> > > scenarios sql will be limited, as described in Jincheng's proposal.
> > >
> > > Correspondingly, the DataStream/DataSet api can provide powerful
> > > functionalities. Users can write ProcessFunction/CoProcessFunction and
> > get
> > > the timer. Compared with SQL, it provides more functionalities and
> > > flexibilities. However, it does not support optimization like SQL.
> > > Meanwhile, DataStream/DataSet api has not been unified which means, for
> > the
> > > same logic, users need to write a job for each stream and batch.
> > >
> > > With TableApi, I think we can combine the advantages of both. Users can
> > > easily write relational operations and enjoy optimization. At the same
> > > time, it supports more functionality and ease of use. Looking forward
> to
> > > the detailed design/FLIP.
> > >
> > > Best,
> > > Hequn
> > >
> > > On Fri, Nov 2, 2018 at 9:48 AM Shaoxuan Wang <[hidden email]>
> > wrote:
> > >
> > > > Hi Aljoscha,
> > > > Glad that you like the proposal. We have completed the prototype of
> > most
> > > > new proposed functionalities. Once collect the feedback from
> community,
> > > we
> > > > will come up with a concrete FLIP/design doc.
> > > >
> > > > Regards,
> > > > Shaoxuan
> > > >
> > > >
> > > > On Thu, Nov 1, 2018 at 8:12 PM Aljoscha Krettek <[hidden email]
> >
> > > > wrote:
> > > >
> > > > > Hi Jincheng,
> > > > >
> > > > > these points sound very good! Are there any concrete proposals for
> > > > > changes? For example a FLIP/design document?
> > > > >
> > > > > See here for FLIPs:
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > > >
> > > > > Best,
> > > > > Aljoscha
> > > > >
> > > > > > On 1. Nov 2018, at 12:51, jincheng sun <[hidden email]
> >
> > > > wrote:
> > > > > >
> > > > > > *--------I am sorry for the formatting of the email content. I
> > > reformat
> > > > > > the **content** as follows-----------*
> > > > > >
> > > > > > *Hi ALL,*
> > > > > >
> > > > > > With the continuous efforts from the community, the Flink system
> > has
> > > > been
> > > > > > continuously improved, which has attracted more and more users.
> > Flink
> > > > SQL
> > > > > > is a canonical, widely used relational query language. However,
> > there
> > > > are
> > > > > > still some scenarios where Flink SQL failed to meet user needs in
> > > terms
> > > > > of
> > > > > > functionality and ease of use, such as:
> > > > > >
> > > > > > *1. In terms of functionality*
> > > > > >    Iteration, user-defined window, user-defined join,
> user-defined
> > > > > > GroupReduce, etc. Users cannot express them with SQL;
> > > > > >
> > > > > > *2. In terms of ease of use*
> > > > > >
> > > > > >   - Map - e.g. “dataStream.map(mapFun)”. Although
> > > “table.select(udf1(),
> > > > > >   udf2(), udf3()....)” can be used to accomplish the same
> > function.,
> > > > > with a
> > > > > >   map() function returning 100 columns, one has to define or call
> > 100
> > > > > UDFs
> > > > > >   when using SQL, which is quite involved.
> > > > > >   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly,
> it
> > > can
> > > > be
> > > > > >   implemented with “table.join(udtf).select()”. However, it is
> > > obvious
> > > > > that
> > > > > >   dataStream is easier to use than SQL.
> > > > > >
> > > > > > Due to the above two reasons, some users have to use the
> DataStream
> > > API
> > > > > or
> > > > > > the DataSet API. But when they do that, they lose the unification
> > of
> > > > > batch
> > > > > > and streaming. They will also lose the sophisticated
> optimizations
> > > such
> > > > > as
> > > > > > codegen, aggregate join transpose and multi-stage agg from Flink
> > SQL.
> > > > > >
> > > > > > We believe that enhancing the functionality and productivity is
> > vital
> > > > for
> > > > > > the successful adoption of Table API. To this end,  Table API
> still
> > > > > > requires more efforts from every contributor in the community. We
> > see
> > > > > great
> > > > > > opportunity in improving our user’s experience from this work.
> Any
> > > > > feedback
> > > > > > is welcome.
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Jincheng
> > > > > >
> > > > > > jincheng sun <[hidden email]> 于2018年11月1日周四 下午5:07写道:
> > > > > >
> > > > > >> Hi all,
> > > > > >>
> > > > > >> With the continuous efforts from the community, the Flink system
> > has
> > > > > been
> > > > > >> continuously improved, which has attracted more and more users.
> > > Flink
> > > > > SQL
> > > > > >> is a canonical, widely used relational query language. However,
> > > there
> > > > > are
> > > > > >> still some scenarios where Flink SQL failed to meet user needs
> in
> > > > terms
> > > > > of
> > > > > >> functionality and ease of use, such as:
> > > > > >>
> > > > > >>
> > > > > >>   -
> > > > > >>
> > > > > >>   In terms of functionality
> > > > > >>
> > > > > >> Iteration, user-defined window, user-defined join, user-defined
> > > > > >> GroupReduce, etc. Users cannot express them with SQL;
> > > > > >>
> > > > > >>   -
> > > > > >>
> > > > > >>   In terms of ease of use
> > > > > >>   -
> > > > > >>
> > > > > >>      Map - e.g. “dataStream.map(mapFun)”. Although
> > > > “table.select(udf1(),
> > > > > >>      udf2(), udf3()....)” can be used to accomplish the same
> > > > function.,
> > > > > with a
> > > > > >>      map() function returning 100 columns, one has to define or
> > call
> > > > > 100 UDFs
> > > > > >>      when using SQL, which is quite involved.
> > > > > >>      -
> > > > > >>
> > > > > >>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly,
> > it
> > > > can
> > > > > >>      be implemented with “table.join(udtf).select()”. However,
> it
> > is
> > > > > obvious
> > > > > >>      that datastream is easier to use than SQL.
> > > > > >>
> > > > > >>
> > > > > >> Due to the above two reasons, some users have to use the
> > DataStream
> > > > API
> > > > > or
> > > > > >> the DataSet API. But when they do that, they lose the
> unification
> > of
> > > > > batch
> > > > > >> and streaming. They will also lose the sophisticated
> optimizations
> > > > such
> > > > > as
> > > > > >> codegen, aggregate join transpose  and multi-stage agg from
> Flink
> > > SQL.
> > > > > >>
> > > > > >> We believe that enhancing the functionality and productivity is
> > > vital
> > > > > for
> > > > > >> the successful adoption of Table API. To this end,  Table API
> > still
> > > > > >> requires more efforts from every contributor in the community.
> We
> > > see
> > > > > great
> > > > > >> opportunity in improving our user’s experience from this work.
> Any
> > > > > feedback
> > > > > >> is welcome.
> > > > > >>
> > > > > >> Regards,
> > > > > >>
> > > > > >> Jincheng
> > > > > >>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

Rong Rong
Hi Jincheng,

Thank you for the proposal! I think being able to define a process /
co-process function in table API definitely opens up a whole new level of
applications using a unified API.

In addition, as Tzu-Li and Hequn have mentioned, the benefit of
optimization layer of Table API will already bring in additional benefit
over directly programming on top of DataStream/DataSet API. I am very
interested an looking forward to seeing the support for more complex use
cases, especially iterations. It will enable table API to define much
broader, event-driven use cases such as real-time ML prediction/training.

As Timo mentioned, This will make Table API diverge from the SQL API. But
as from my experience Table API was always giving me the impression to be a
more sophisticated, syntactic-aware way to express relational operations.
Looking forward to further discussion and collaborations on the FLIP doc.

--
Rong

On Sun, Nov 4, 2018 at 5:22 PM jincheng sun <[hidden email]>
wrote:

> Hi tison,
>
> Thanks a lot for your feedback!
> I am very happy to see that community contributors agree to enhanced the
> TableAPI. This work is a long-term continuous work, we will push it in
> stages, we will soon complete  the enhanced list of the first phase, we can
> go deep discussion  in google doc. thanks again for joining on the very
> important discussion of the Flink Table API.
>
> Thanks,
> Jincheng
>
> Tzu-Li Chen <[hidden email]> 于2018年11月2日周五 下午1:49写道:
>
> > Hi jingchengm
> >
> > Thanks a lot for your proposal! I find it is a good start point for
> > internal optimization works and help Flink to be more
> > user-friendly.
> >
> > AFAIK, DataStream is the most popular API currently that Flink
> > users should describe their logic with detailed logic.
> > From a more internal view the conversion from DataStream to
> > JobGraph is quite mechanically and hard to be optimized. So when
> > users program with DataStream, they have to learn more internals
> > and spend a lot of time to tune for performance.
> > With your proposal, we provide enhanced functionality of Table API,
> > so that users can describe their job easily on Table aspect. This gives
> > an opportunity to Flink developers to introduce an optimize phase
> > while transforming user program(described by Table API) to internal
> > representation.
> >
> > Given a user who want to start using Flink with simple ETL, pipelining
> > or analytics, he would find it is most naturally described by SQL/Table
> > API. Further, as mentioned by @hequn,
> >
> > SQL is a widely used language. It follows standards, is a
> > > descriptive language, and is easy to use
> >
> >
> > thus we could expect with the enhancement of SQL/Table API, Flink
> > becomes more friendly to users.
> >
> > Looking forward to the design doc/FLIP!
> >
> > Best,
> > tison.
> >
> >
> > jincheng sun <[hidden email]> 于2018年11月2日周五 上午11:46写道:
> >
> > > Hi Hequn,
> > > Thanks for your feedback! And also thanks for our offline discussion!
> > > You are right, unification of batch and streaming is very important for
> > > flink API.
> > > We will provide more detailed design later, Please let me know if you
> > have
> > > further thoughts or feedback.
> > >
> > > Thanks,
> > > Jincheng
> > >
> > > Hequn Cheng <[hidden email]> 于2018年11月2日周五 上午10:02写道:
> > >
> > > > Hi Jincheng,
> > > >
> > > > Thanks a lot for your proposal. It is very encouraging!
> > > >
> > > > As we all know, SQL is a widely used language. It follows standards,
> > is a
> > > > descriptive language, and is easy to use. A powerful feature of SQL
> is
> > > that
> > > > it supports optimization. Users only need to care about the logic of
> > the
> > > > program. The underlying optimizer will help users optimize the
> > > performance
> > > > of the program. However, in terms of functionality and ease of use,
> in
> > > some
> > > > scenarios sql will be limited, as described in Jincheng's proposal.
> > > >
> > > > Correspondingly, the DataStream/DataSet api can provide powerful
> > > > functionalities. Users can write ProcessFunction/CoProcessFunction
> and
> > > get
> > > > the timer. Compared with SQL, it provides more functionalities and
> > > > flexibilities. However, it does not support optimization like SQL.
> > > > Meanwhile, DataStream/DataSet api has not been unified which means,
> for
> > > the
> > > > same logic, users need to write a job for each stream and batch.
> > > >
> > > > With TableApi, I think we can combine the advantages of both. Users
> can
> > > > easily write relational operations and enjoy optimization. At the
> same
> > > > time, it supports more functionality and ease of use. Looking forward
> > to
> > > > the detailed design/FLIP.
> > > >
> > > > Best,
> > > > Hequn
> > > >
> > > > On Fri, Nov 2, 2018 at 9:48 AM Shaoxuan Wang <[hidden email]>
> > > wrote:
> > > >
> > > > > Hi Aljoscha,
> > > > > Glad that you like the proposal. We have completed the prototype of
> > > most
> > > > > new proposed functionalities. Once collect the feedback from
> > community,
> > > > we
> > > > > will come up with a concrete FLIP/design doc.
> > > > >
> > > > > Regards,
> > > > > Shaoxuan
> > > > >
> > > > >
> > > > > On Thu, Nov 1, 2018 at 8:12 PM Aljoscha Krettek <
> [hidden email]
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi Jincheng,
> > > > > >
> > > > > > these points sound very good! Are there any concrete proposals
> for
> > > > > > changes? For example a FLIP/design document?
> > > > > >
> > > > > > See here for FLIPs:
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > > > >
> > > > > > Best,
> > > > > > Aljoscha
> > > > > >
> > > > > > > On 1. Nov 2018, at 12:51, jincheng sun <
> [hidden email]
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > *--------I am sorry for the formatting of the email content. I
> > > > reformat
> > > > > > > the **content** as follows-----------*
> > > > > > >
> > > > > > > *Hi ALL,*
> > > > > > >
> > > > > > > With the continuous efforts from the community, the Flink
> system
> > > has
> > > > > been
> > > > > > > continuously improved, which has attracted more and more users.
> > > Flink
> > > > > SQL
> > > > > > > is a canonical, widely used relational query language. However,
> > > there
> > > > > are
> > > > > > > still some scenarios where Flink SQL failed to meet user needs
> in
> > > > terms
> > > > > > of
> > > > > > > functionality and ease of use, such as:
> > > > > > >
> > > > > > > *1. In terms of functionality*
> > > > > > >    Iteration, user-defined window, user-defined join,
> > user-defined
> > > > > > > GroupReduce, etc. Users cannot express them with SQL;
> > > > > > >
> > > > > > > *2. In terms of ease of use*
> > > > > > >
> > > > > > >   - Map - e.g. “dataStream.map(mapFun)”. Although
> > > > “table.select(udf1(),
> > > > > > >   udf2(), udf3()....)” can be used to accomplish the same
> > > function.,
> > > > > > with a
> > > > > > >   map() function returning 100 columns, one has to define or
> call
> > > 100
> > > > > > UDFs
> > > > > > >   when using SQL, which is quite involved.
> > > > > > >   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly,
> > it
> > > > can
> > > > > be
> > > > > > >   implemented with “table.join(udtf).select()”. However, it is
> > > > obvious
> > > > > > that
> > > > > > >   dataStream is easier to use than SQL.
> > > > > > >
> > > > > > > Due to the above two reasons, some users have to use the
> > DataStream
> > > > API
> > > > > > or
> > > > > > > the DataSet API. But when they do that, they lose the
> unification
> > > of
> > > > > > batch
> > > > > > > and streaming. They will also lose the sophisticated
> > optimizations
> > > > such
> > > > > > as
> > > > > > > codegen, aggregate join transpose and multi-stage agg from
> Flink
> > > SQL.
> > > > > > >
> > > > > > > We believe that enhancing the functionality and productivity is
> > > vital
> > > > > for
> > > > > > > the successful adoption of Table API. To this end,  Table API
> > still
> > > > > > > requires more efforts from every contributor in the community.
> We
> > > see
> > > > > > great
> > > > > > > opportunity in improving our user’s experience from this work.
> > Any
> > > > > > feedback
> > > > > > > is welcome.
> > > > > > >
> > > > > > > Regards,
> > > > > > >
> > > > > > > Jincheng
> > > > > > >
> > > > > > > jincheng sun <[hidden email]> 于2018年11月1日周四
> 下午5:07写道:
> > > > > > >
> > > > > > >> Hi all,
> > > > > > >>
> > > > > > >> With the continuous efforts from the community, the Flink
> system
> > > has
> > > > > > been
> > > > > > >> continuously improved, which has attracted more and more
> users.
> > > > Flink
> > > > > > SQL
> > > > > > >> is a canonical, widely used relational query language.
> However,
> > > > there
> > > > > > are
> > > > > > >> still some scenarios where Flink SQL failed to meet user needs
> > in
> > > > > terms
> > > > > > of
> > > > > > >> functionality and ease of use, such as:
> > > > > > >>
> > > > > > >>
> > > > > > >>   -
> > > > > > >>
> > > > > > >>   In terms of functionality
> > > > > > >>
> > > > > > >> Iteration, user-defined window, user-defined join,
> user-defined
> > > > > > >> GroupReduce, etc. Users cannot express them with SQL;
> > > > > > >>
> > > > > > >>   -
> > > > > > >>
> > > > > > >>   In terms of ease of use
> > > > > > >>   -
> > > > > > >>
> > > > > > >>      Map - e.g. “dataStream.map(mapFun)”. Although
> > > > > “table.select(udf1(),
> > > > > > >>      udf2(), udf3()....)” can be used to accomplish the same
> > > > > function.,
> > > > > > with a
> > > > > > >>      map() function returning 100 columns, one has to define
> or
> > > call
> > > > > > 100 UDFs
> > > > > > >>      when using SQL, which is quite involved.
> > > > > > >>      -
> > > > > > >>
> > > > > > >>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”.
> Similarly,
> > > it
> > > > > can
> > > > > > >>      be implemented with “table.join(udtf).select()”. However,
> > it
> > > is
> > > > > > obvious
> > > > > > >>      that datastream is easier to use than SQL.
> > > > > > >>
> > > > > > >>
> > > > > > >> Due to the above two reasons, some users have to use the
> > > DataStream
> > > > > API
> > > > > > or
> > > > > > >> the DataSet API. But when they do that, they lose the
> > unification
> > > of
> > > > > > batch
> > > > > > >> and streaming. They will also lose the sophisticated
> > optimizations
> > > > > such
> > > > > > as
> > > > > > >> codegen, aggregate join transpose  and multi-stage agg from
> > Flink
> > > > SQL.
> > > > > > >>
> > > > > > >> We believe that enhancing the functionality and productivity
> is
> > > > vital
> > > > > > for
> > > > > > >> the successful adoption of Table API. To this end,  Table API
> > > still
> > > > > > >> requires more efforts from every contributor in the community.
> > We
> > > > see
> > > > > > great
> > > > > > >> opportunity in improving our user’s experience from this work.
> > Any
> > > > > > feedback
> > > > > > >> is welcome.
> > > > > > >>
> > > > > > >> Regards,
> > > > > > >>
> > > > > > >> Jincheng
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

Fabian Hueske-2
Hi Jincheng,

Thanks for this interesting proposal.
I like that we can push this effort forward in a very fine-grained manner,
i.e., incrementally adding more APIs to the Table API.

However, I also have a few questions / concerns.
Today, the Table API is tightly integrated with the DataSet and DataStream
APIs. It is very easy to convert a Table into a DataSet or DataStream and
vice versa. This mean it is already easy to combine custom logic an
relational operations. What I like is that several aspects are clearly
separated like retraction and timestamp handling (see below) + all
libraries on DataStream/DataSet can be easily combined with relational
operations.
I can see that adding more functionality to the Table API would remove the
distinction between DataSet and DataStream. However, wouldn't we get a
similar benefit by extending the DataStream API for proper support for
bounded streams (as is the long-term goal of Flink)?
I'm also a bit skeptical about the optimization opportunities we would
gain. Map/FlatMap UDFs are black boxes that cannot be easily removed
without additional information (I did some research on this a few years ago
[1]).

Moreover, I think there are a few tricky details that need to be resolved
to enable a good integration.

1) How to deal with retraction messages? The DataStream API does not have a
notion of retractions. How would a MapFunction or FlatMapFunction handle
retraction? Do they need to be aware of the change flag? Custom windowing
and aggregation logic would certainly need to have that information.
2) How to deal with timestamps? The DataStream API does not give access to
timestamps. In the Table API / SQL these are exposed as regular attributes.
How can we ensure that timestamp attributes remain valid (i.e. aligned with
watermarks) if the output is produced by arbitrary code?
There might be more issues of this kind.

My main question would be how much would we gain with this proposal over a
tight integration of Table API and DataStream API, assuming that batch
functionality is moved to DataStream?

Best, Fabian

[1] http://stratosphere.eu/assets/papers/openingTheBlackBoxes_12.pdf


Am Mo., 5. Nov. 2018 um 02:49 Uhr schrieb Rong Rong <[hidden email]>:

> Hi Jincheng,
>
> Thank you for the proposal! I think being able to define a process /
> co-process function in table API definitely opens up a whole new level of
> applications using a unified API.
>
> In addition, as Tzu-Li and Hequn have mentioned, the benefit of
> optimization layer of Table API will already bring in additional benefit
> over directly programming on top of DataStream/DataSet API. I am very
> interested an looking forward to seeing the support for more complex use
> cases, especially iterations. It will enable table API to define much
> broader, event-driven use cases such as real-time ML prediction/training.
>
> As Timo mentioned, This will make Table API diverge from the SQL API. But
> as from my experience Table API was always giving me the impression to be a
> more sophisticated, syntactic-aware way to express relational operations.
> Looking forward to further discussion and collaborations on the FLIP doc.
>
> --
> Rong
>
> On Sun, Nov 4, 2018 at 5:22 PM jincheng sun <[hidden email]>
> wrote:
>
> > Hi tison,
> >
> > Thanks a lot for your feedback!
> > I am very happy to see that community contributors agree to enhanced the
> > TableAPI. This work is a long-term continuous work, we will push it in
> > stages, we will soon complete  the enhanced list of the first phase, we
> can
> > go deep discussion  in google doc. thanks again for joining on the very
> > important discussion of the Flink Table API.
> >
> > Thanks,
> > Jincheng
> >
> > Tzu-Li Chen <[hidden email]> 于2018年11月2日周五 下午1:49写道:
> >
> > > Hi jingchengm
> > >
> > > Thanks a lot for your proposal! I find it is a good start point for
> > > internal optimization works and help Flink to be more
> > > user-friendly.
> > >
> > > AFAIK, DataStream is the most popular API currently that Flink
> > > users should describe their logic with detailed logic.
> > > From a more internal view the conversion from DataStream to
> > > JobGraph is quite mechanically and hard to be optimized. So when
> > > users program with DataStream, they have to learn more internals
> > > and spend a lot of time to tune for performance.
> > > With your proposal, we provide enhanced functionality of Table API,
> > > so that users can describe their job easily on Table aspect. This gives
> > > an opportunity to Flink developers to introduce an optimize phase
> > > while transforming user program(described by Table API) to internal
> > > representation.
> > >
> > > Given a user who want to start using Flink with simple ETL, pipelining
> > > or analytics, he would find it is most naturally described by SQL/Table
> > > API. Further, as mentioned by @hequn,
> > >
> > > SQL is a widely used language. It follows standards, is a
> > > > descriptive language, and is easy to use
> > >
> > >
> > > thus we could expect with the enhancement of SQL/Table API, Flink
> > > becomes more friendly to users.
> > >
> > > Looking forward to the design doc/FLIP!
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > jincheng sun <[hidden email]> 于2018年11月2日周五 上午11:46写道:
> > >
> > > > Hi Hequn,
> > > > Thanks for your feedback! And also thanks for our offline discussion!
> > > > You are right, unification of batch and streaming is very important
> for
> > > > flink API.
> > > > We will provide more detailed design later, Please let me know if you
> > > have
> > > > further thoughts or feedback.
> > > >
> > > > Thanks,
> > > > Jincheng
> > > >
> > > > Hequn Cheng <[hidden email]> 于2018年11月2日周五 上午10:02写道:
> > > >
> > > > > Hi Jincheng,
> > > > >
> > > > > Thanks a lot for your proposal. It is very encouraging!
> > > > >
> > > > > As we all know, SQL is a widely used language. It follows
> standards,
> > > is a
> > > > > descriptive language, and is easy to use. A powerful feature of SQL
> > is
> > > > that
> > > > > it supports optimization. Users only need to care about the logic
> of
> > > the
> > > > > program. The underlying optimizer will help users optimize the
> > > > performance
> > > > > of the program. However, in terms of functionality and ease of use,
> > in
> > > > some
> > > > > scenarios sql will be limited, as described in Jincheng's proposal.
> > > > >
> > > > > Correspondingly, the DataStream/DataSet api can provide powerful
> > > > > functionalities. Users can write ProcessFunction/CoProcessFunction
> > and
> > > > get
> > > > > the timer. Compared with SQL, it provides more functionalities and
> > > > > flexibilities. However, it does not support optimization like SQL.
> > > > > Meanwhile, DataStream/DataSet api has not been unified which means,
> > for
> > > > the
> > > > > same logic, users need to write a job for each stream and batch.
> > > > >
> > > > > With TableApi, I think we can combine the advantages of both. Users
> > can
> > > > > easily write relational operations and enjoy optimization. At the
> > same
> > > > > time, it supports more functionality and ease of use. Looking
> forward
> > > to
> > > > > the detailed design/FLIP.
> > > > >
> > > > > Best,
> > > > > Hequn
> > > > >
> > > > > On Fri, Nov 2, 2018 at 9:48 AM Shaoxuan Wang <[hidden email]>
> > > > wrote:
> > > > >
> > > > > > Hi Aljoscha,
> > > > > > Glad that you like the proposal. We have completed the prototype
> of
> > > > most
> > > > > > new proposed functionalities. Once collect the feedback from
> > > community,
> > > > > we
> > > > > > will come up with a concrete FLIP/design doc.
> > > > > >
> > > > > > Regards,
> > > > > > Shaoxuan
> > > > > >
> > > > > >
> > > > > > On Thu, Nov 1, 2018 at 8:12 PM Aljoscha Krettek <
> > [hidden email]
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Jincheng,
> > > > > > >
> > > > > > > these points sound very good! Are there any concrete proposals
> > for
> > > > > > > changes? For example a FLIP/design document?
> > > > > > >
> > > > > > > See here for FLIPs:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > > > > >
> > > > > > > Best,
> > > > > > > Aljoscha
> > > > > > >
> > > > > > > > On 1. Nov 2018, at 12:51, jincheng sun <
> > [hidden email]
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > *--------I am sorry for the formatting of the email content.
> I
> > > > > reformat
> > > > > > > > the **content** as follows-----------*
> > > > > > > >
> > > > > > > > *Hi ALL,*
> > > > > > > >
> > > > > > > > With the continuous efforts from the community, the Flink
> > system
> > > > has
> > > > > > been
> > > > > > > > continuously improved, which has attracted more and more
> users.
> > > > Flink
> > > > > > SQL
> > > > > > > > is a canonical, widely used relational query language.
> However,
> > > > there
> > > > > > are
> > > > > > > > still some scenarios where Flink SQL failed to meet user
> needs
> > in
> > > > > terms
> > > > > > > of
> > > > > > > > functionality and ease of use, such as:
> > > > > > > >
> > > > > > > > *1. In terms of functionality*
> > > > > > > >    Iteration, user-defined window, user-defined join,
> > > user-defined
> > > > > > > > GroupReduce, etc. Users cannot express them with SQL;
> > > > > > > >
> > > > > > > > *2. In terms of ease of use*
> > > > > > > >
> > > > > > > >   - Map - e.g. “dataStream.map(mapFun)”. Although
> > > > > “table.select(udf1(),
> > > > > > > >   udf2(), udf3()....)” can be used to accomplish the same
> > > > function.,
> > > > > > > with a
> > > > > > > >   map() function returning 100 columns, one has to define or
> > call
> > > > 100
> > > > > > > UDFs
> > > > > > > >   when using SQL, which is quite involved.
> > > > > > > >   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”.
> Similarly,
> > > it
> > > > > can
> > > > > > be
> > > > > > > >   implemented with “table.join(udtf).select()”. However, it
> is
> > > > > obvious
> > > > > > > that
> > > > > > > >   dataStream is easier to use than SQL.
> > > > > > > >
> > > > > > > > Due to the above two reasons, some users have to use the
> > > DataStream
> > > > > API
> > > > > > > or
> > > > > > > > the DataSet API. But when they do that, they lose the
> > unification
> > > > of
> > > > > > > batch
> > > > > > > > and streaming. They will also lose the sophisticated
> > > optimizations
> > > > > such
> > > > > > > as
> > > > > > > > codegen, aggregate join transpose and multi-stage agg from
> > Flink
> > > > SQL.
> > > > > > > >
> > > > > > > > We believe that enhancing the functionality and productivity
> is
> > > > vital
> > > > > > for
> > > > > > > > the successful adoption of Table API. To this end,  Table API
> > > still
> > > > > > > > requires more efforts from every contributor in the
> community.
> > We
> > > > see
> > > > > > > great
> > > > > > > > opportunity in improving our user’s experience from this
> work.
> > > Any
> > > > > > > feedback
> > > > > > > > is welcome.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > >
> > > > > > > > Jincheng
> > > > > > > >
> > > > > > > > jincheng sun <[hidden email]> 于2018年11月1日周四
> > 下午5:07写道:
> > > > > > > >
> > > > > > > >> Hi all,
> > > > > > > >>
> > > > > > > >> With the continuous efforts from the community, the Flink
> > system
> > > > has
> > > > > > > been
> > > > > > > >> continuously improved, which has attracted more and more
> > users.
> > > > > Flink
> > > > > > > SQL
> > > > > > > >> is a canonical, widely used relational query language.
> > However,
> > > > > there
> > > > > > > are
> > > > > > > >> still some scenarios where Flink SQL failed to meet user
> needs
> > > in
> > > > > > terms
> > > > > > > of
> > > > > > > >> functionality and ease of use, such as:
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>   -
> > > > > > > >>
> > > > > > > >>   In terms of functionality
> > > > > > > >>
> > > > > > > >> Iteration, user-defined window, user-defined join,
> > user-defined
> > > > > > > >> GroupReduce, etc. Users cannot express them with SQL;
> > > > > > > >>
> > > > > > > >>   -
> > > > > > > >>
> > > > > > > >>   In terms of ease of use
> > > > > > > >>   -
> > > > > > > >>
> > > > > > > >>      Map - e.g. “dataStream.map(mapFun)”. Although
> > > > > > “table.select(udf1(),
> > > > > > > >>      udf2(), udf3()....)” can be used to accomplish the same
> > > > > > function.,
> > > > > > > with a
> > > > > > > >>      map() function returning 100 columns, one has to define
> > or
> > > > call
> > > > > > > 100 UDFs
> > > > > > > >>      when using SQL, which is quite involved.
> > > > > > > >>      -
> > > > > > > >>
> > > > > > > >>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”.
> > Similarly,
> > > > it
> > > > > > can
> > > > > > > >>      be implemented with “table.join(udtf).select()”.
> However,
> > > it
> > > > is
> > > > > > > obvious
> > > > > > > >>      that datastream is easier to use than SQL.
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> Due to the above two reasons, some users have to use the
> > > > DataStream
> > > > > > API
> > > > > > > or
> > > > > > > >> the DataSet API. But when they do that, they lose the
> > > unification
> > > > of
> > > > > > > batch
> > > > > > > >> and streaming. They will also lose the sophisticated
> > > optimizations
> > > > > > such
> > > > > > > as
> > > > > > > >> codegen, aggregate join transpose  and multi-stage agg from
> > > Flink
> > > > > SQL.
> > > > > > > >>
> > > > > > > >> We believe that enhancing the functionality and productivity
> > is
> > > > > vital
> > > > > > > for
> > > > > > > >> the successful adoption of Table API. To this end,  Table
> API
> > > > still
> > > > > > > >> requires more efforts from every contributor in the
> community.
> > > We
> > > > > see
> > > > > > > great
> > > > > > > >> opportunity in improving our user’s experience from this
> work.
> > > Any
> > > > > > > feedback
> > > > > > > >> is welcome.
> > > > > > > >>
> > > > > > > >> Regards,
> > > > > > > >>
> > > > > > > >> Jincheng
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

Xiaowei Jiang
Hi Fabian, these are great questions! I have some quick thoughts on some of
these.

Optimization opportunities: I think that you are right UDFs are more like
blackboxes today. However this can change if we let user develop UDFs
symbolically in the future (i.e., Flink will look inside the UDF code,
understand it and potentially do codegen to execute it). This will open the
door for potential optimizations.

Moving batch functionality to DataStream: I actually think that moving
batch functionality to Table API is probably a better idea. Currently,
DataStream API is very deterministic and imperative. On the other hand,
DataSet API has an optimizer and can choose very different execution plans.
Another distinguishing feature of DataStream API is that users get direct
access to state/statebackend which we intensionally avoided in Table API so
far. The introduction of states is probably the biggest innovation by
Flink. At the same time, abusing states may also be the largest source for
reliability/performance issues. Shielding users away from dealing with
state directly is a key advantage in Table API. I think this is probably a
good boundary between DataStream and Table API. If users HAVE to manipulate
state explicitly, go with DataStream, otherwise, go with Table API.

Because Flink is extending into more and more scenarios (e.g., batch,
streaming & micro-service), we may inevitably end up with multiple APIs. It
appears that data analytics (batch & streaming) related applications can be
well served with Table API which not only unifies batch and streaming, but
also relieves the users from dealing with states explicitly. On the other
hand, DataStream is very convenient and powerful for micro-service kind of
applications because explicitly state access may be necessary in such cases.

We will start a threading outlining what we are proposing in Table API.

Regards,
Xiaowei

On Mon, Nov 5, 2018 at 7:03 PM Fabian Hueske <[hidden email]> wrote:

> Hi Jincheng,
>
> Thanks for this interesting proposal.
> I like that we can push this effort forward in a very fine-grained manner,
> i.e., incrementally adding more APIs to the Table API.
>
> However, I also have a few questions / concerns.
> Today, the Table API is tightly integrated with the DataSet and DataStream
> APIs. It is very easy to convert a Table into a DataSet or DataStream and
> vice versa. This mean it is already easy to combine custom logic an
> relational operations. What I like is that several aspects are clearly
> separated like retraction and timestamp handling (see below) + all
> libraries on DataStream/DataSet can be easily combined with relational
> operations.
> I can see that adding more functionality to the Table API would remove the
> distinction between DataSet and DataStream. However, wouldn't we get a
> similar benefit by extending the DataStream API for proper support for
> bounded streams (as is the long-term goal of Flink)?
> I'm also a bit skeptical about the optimization opportunities we would
> gain. Map/FlatMap UDFs are black boxes that cannot be easily removed
> without additional information (I did some research on this a few years ago
> [1]).
>
> Moreover, I think there are a few tricky details that need to be resolved
> to enable a good integration.
>
> 1) How to deal with retraction messages? The DataStream API does not have a
> notion of retractions. How would a MapFunction or FlatMapFunction handle
> retraction? Do they need to be aware of the change flag? Custom windowing
> and aggregation logic would certainly need to have that information.
> 2) How to deal with timestamps? The DataStream API does not give access to
> timestamps. In the Table API / SQL these are exposed as regular attributes.
> How can we ensure that timestamp attributes remain valid (i.e. aligned with
> watermarks) if the output is produced by arbitrary code?
> There might be more issues of this kind.
>
> My main question would be how much would we gain with this proposal over a
> tight integration of Table API and DataStream API, assuming that batch
> functionality is moved to DataStream?
>
> Best, Fabian
>
> [1] http://stratosphere.eu/assets/papers/openingTheBlackBoxes_12.pdf
>
>
> Am Mo., 5. Nov. 2018 um 02:49 Uhr schrieb Rong Rong <[hidden email]>:
>
> > Hi Jincheng,
> >
> > Thank you for the proposal! I think being able to define a process /
> > co-process function in table API definitely opens up a whole new level of
> > applications using a unified API.
> >
> > In addition, as Tzu-Li and Hequn have mentioned, the benefit of
> > optimization layer of Table API will already bring in additional benefit
> > over directly programming on top of DataStream/DataSet API. I am very
> > interested an looking forward to seeing the support for more complex use
> > cases, especially iterations. It will enable table API to define much
> > broader, event-driven use cases such as real-time ML prediction/training.
> >
> > As Timo mentioned, This will make Table API diverge from the SQL API. But
> > as from my experience Table API was always giving me the impression to
> be a
> > more sophisticated, syntactic-aware way to express relational operations.
> > Looking forward to further discussion and collaborations on the FLIP doc.
> >
> > --
> > Rong
> >
> > On Sun, Nov 4, 2018 at 5:22 PM jincheng sun <[hidden email]>
> > wrote:
> >
> > > Hi tison,
> > >
> > > Thanks a lot for your feedback!
> > > I am very happy to see that community contributors agree to enhanced
> the
> > > TableAPI. This work is a long-term continuous work, we will push it in
> > > stages, we will soon complete  the enhanced list of the first phase, we
> > can
> > > go deep discussion  in google doc. thanks again for joining on the very
> > > important discussion of the Flink Table API.
> > >
> > > Thanks,
> > > Jincheng
> > >
> > > Tzu-Li Chen <[hidden email]> 于2018年11月2日周五 下午1:49写道:
> > >
> > > > Hi jingchengm
> > > >
> > > > Thanks a lot for your proposal! I find it is a good start point for
> > > > internal optimization works and help Flink to be more
> > > > user-friendly.
> > > >
> > > > AFAIK, DataStream is the most popular API currently that Flink
> > > > users should describe their logic with detailed logic.
> > > > From a more internal view the conversion from DataStream to
> > > > JobGraph is quite mechanically and hard to be optimized. So when
> > > > users program with DataStream, they have to learn more internals
> > > > and spend a lot of time to tune for performance.
> > > > With your proposal, we provide enhanced functionality of Table API,
> > > > so that users can describe their job easily on Table aspect. This
> gives
> > > > an opportunity to Flink developers to introduce an optimize phase
> > > > while transforming user program(described by Table API) to internal
> > > > representation.
> > > >
> > > > Given a user who want to start using Flink with simple ETL,
> pipelining
> > > > or analytics, he would find it is most naturally described by
> SQL/Table
> > > > API. Further, as mentioned by @hequn,
> > > >
> > > > SQL is a widely used language. It follows standards, is a
> > > > > descriptive language, and is easy to use
> > > >
> > > >
> > > > thus we could expect with the enhancement of SQL/Table API, Flink
> > > > becomes more friendly to users.
> > > >
> > > > Looking forward to the design doc/FLIP!
> > > >
> > > > Best,
> > > > tison.
> > > >
> > > >
> > > > jincheng sun <[hidden email]> 于2018年11月2日周五 上午11:46写道:
> > > >
> > > > > Hi Hequn,
> > > > > Thanks for your feedback! And also thanks for our offline
> discussion!
> > > > > You are right, unification of batch and streaming is very important
> > for
> > > > > flink API.
> > > > > We will provide more detailed design later, Please let me know if
> you
> > > > have
> > > > > further thoughts or feedback.
> > > > >
> > > > > Thanks,
> > > > > Jincheng
> > > > >
> > > > > Hequn Cheng <[hidden email]> 于2018年11月2日周五 上午10:02写道:
> > > > >
> > > > > > Hi Jincheng,
> > > > > >
> > > > > > Thanks a lot for your proposal. It is very encouraging!
> > > > > >
> > > > > > As we all know, SQL is a widely used language. It follows
> > standards,
> > > > is a
> > > > > > descriptive language, and is easy to use. A powerful feature of
> SQL
> > > is
> > > > > that
> > > > > > it supports optimization. Users only need to care about the logic
> > of
> > > > the
> > > > > > program. The underlying optimizer will help users optimize the
> > > > > performance
> > > > > > of the program. However, in terms of functionality and ease of
> use,
> > > in
> > > > > some
> > > > > > scenarios sql will be limited, as described in Jincheng's
> proposal.
> > > > > >
> > > > > > Correspondingly, the DataStream/DataSet api can provide powerful
> > > > > > functionalities. Users can write
> ProcessFunction/CoProcessFunction
> > > and
> > > > > get
> > > > > > the timer. Compared with SQL, it provides more functionalities
> and
> > > > > > flexibilities. However, it does not support optimization like
> SQL.
> > > > > > Meanwhile, DataStream/DataSet api has not been unified which
> means,
> > > for
> > > > > the
> > > > > > same logic, users need to write a job for each stream and batch.
> > > > > >
> > > > > > With TableApi, I think we can combine the advantages of both.
> Users
> > > can
> > > > > > easily write relational operations and enjoy optimization. At the
> > > same
> > > > > > time, it supports more functionality and ease of use. Looking
> > forward
> > > > to
> > > > > > the detailed design/FLIP.
> > > > > >
> > > > > > Best,
> > > > > > Hequn
> > > > > >
> > > > > > On Fri, Nov 2, 2018 at 9:48 AM Shaoxuan Wang <
> [hidden email]>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Aljoscha,
> > > > > > > Glad that you like the proposal. We have completed the
> prototype
> > of
> > > > > most
> > > > > > > new proposed functionalities. Once collect the feedback from
> > > > community,
> > > > > > we
> > > > > > > will come up with a concrete FLIP/design doc.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Shaoxuan
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Nov 1, 2018 at 8:12 PM Aljoscha Krettek <
> > > [hidden email]
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jincheng,
> > > > > > > >
> > > > > > > > these points sound very good! Are there any concrete
> proposals
> > > for
> > > > > > > > changes? For example a FLIP/design document?
> > > > > > > >
> > > > > > > > See here for FLIPs:
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Aljoscha
> > > > > > > >
> > > > > > > > > On 1. Nov 2018, at 12:51, jincheng sun <
> > > [hidden email]
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > *--------I am sorry for the formatting of the email
> content.
> > I
> > > > > > reformat
> > > > > > > > > the **content** as follows-----------*
> > > > > > > > >
> > > > > > > > > *Hi ALL,*
> > > > > > > > >
> > > > > > > > > With the continuous efforts from the community, the Flink
> > > system
> > > > > has
> > > > > > > been
> > > > > > > > > continuously improved, which has attracted more and more
> > users.
> > > > > Flink
> > > > > > > SQL
> > > > > > > > > is a canonical, widely used relational query language.
> > However,
> > > > > there
> > > > > > > are
> > > > > > > > > still some scenarios where Flink SQL failed to meet user
> > needs
> > > in
> > > > > > terms
> > > > > > > > of
> > > > > > > > > functionality and ease of use, such as:
> > > > > > > > >
> > > > > > > > > *1. In terms of functionality*
> > > > > > > > >    Iteration, user-defined window, user-defined join,
> > > > user-defined
> > > > > > > > > GroupReduce, etc. Users cannot express them with SQL;
> > > > > > > > >
> > > > > > > > > *2. In terms of ease of use*
> > > > > > > > >
> > > > > > > > >   - Map - e.g. “dataStream.map(mapFun)”. Although
> > > > > > “table.select(udf1(),
> > > > > > > > >   udf2(), udf3()....)” can be used to accomplish the same
> > > > > function.,
> > > > > > > > with a
> > > > > > > > >   map() function returning 100 columns, one has to define
> or
> > > call
> > > > > 100
> > > > > > > > UDFs
> > > > > > > > >   when using SQL, which is quite involved.
> > > > > > > > >   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”.
> > Similarly,
> > > > it
> > > > > > can
> > > > > > > be
> > > > > > > > >   implemented with “table.join(udtf).select()”. However, it
> > is
> > > > > > obvious
> > > > > > > > that
> > > > > > > > >   dataStream is easier to use than SQL.
> > > > > > > > >
> > > > > > > > > Due to the above two reasons, some users have to use the
> > > > DataStream
> > > > > > API
> > > > > > > > or
> > > > > > > > > the DataSet API. But when they do that, they lose the
> > > unification
> > > > > of
> > > > > > > > batch
> > > > > > > > > and streaming. They will also lose the sophisticated
> > > > optimizations
> > > > > > such
> > > > > > > > as
> > > > > > > > > codegen, aggregate join transpose and multi-stage agg from
> > > Flink
> > > > > SQL.
> > > > > > > > >
> > > > > > > > > We believe that enhancing the functionality and
> productivity
> > is
> > > > > vital
> > > > > > > for
> > > > > > > > > the successful adoption of Table API. To this end,  Table
> API
> > > > still
> > > > > > > > > requires more efforts from every contributor in the
> > community.
> > > We
> > > > > see
> > > > > > > > great
> > > > > > > > > opportunity in improving our user’s experience from this
> > work.
> > > > Any
> > > > > > > > feedback
> > > > > > > > > is welcome.
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > >
> > > > > > > > > Jincheng
> > > > > > > > >
> > > > > > > > > jincheng sun <[hidden email]> 于2018年11月1日周四
> > > 下午5:07写道:
> > > > > > > > >
> > > > > > > > >> Hi all,
> > > > > > > > >>
> > > > > > > > >> With the continuous efforts from the community, the Flink
> > > system
> > > > > has
> > > > > > > > been
> > > > > > > > >> continuously improved, which has attracted more and more
> > > users.
> > > > > > Flink
> > > > > > > > SQL
> > > > > > > > >> is a canonical, widely used relational query language.
> > > However,
> > > > > > there
> > > > > > > > are
> > > > > > > > >> still some scenarios where Flink SQL failed to meet user
> > needs
> > > > in
> > > > > > > terms
> > > > > > > > of
> > > > > > > > >> functionality and ease of use, such as:
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >>   -
> > > > > > > > >>
> > > > > > > > >>   In terms of functionality
> > > > > > > > >>
> > > > > > > > >> Iteration, user-defined window, user-defined join,
> > > user-defined
> > > > > > > > >> GroupReduce, etc. Users cannot express them with SQL;
> > > > > > > > >>
> > > > > > > > >>   -
> > > > > > > > >>
> > > > > > > > >>   In terms of ease of use
> > > > > > > > >>   -
> > > > > > > > >>
> > > > > > > > >>      Map - e.g. “dataStream.map(mapFun)”. Although
> > > > > > > “table.select(udf1(),
> > > > > > > > >>      udf2(), udf3()....)” can be used to accomplish the
> same
> > > > > > > function.,
> > > > > > > > with a
> > > > > > > > >>      map() function returning 100 columns, one has to
> define
> > > or
> > > > > call
> > > > > > > > 100 UDFs
> > > > > > > > >>      when using SQL, which is quite involved.
> > > > > > > > >>      -
> > > > > > > > >>
> > > > > > > > >>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”.
> > > Similarly,
> > > > > it
> > > > > > > can
> > > > > > > > >>      be implemented with “table.join(udtf).select()”.
> > However,
> > > > it
> > > > > is
> > > > > > > > obvious
> > > > > > > > >>      that datastream is easier to use than SQL.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> Due to the above two reasons, some users have to use the
> > > > > DataStream
> > > > > > > API
> > > > > > > > or
> > > > > > > > >> the DataSet API. But when they do that, they lose the
> > > > unification
> > > > > of
> > > > > > > > batch
> > > > > > > > >> and streaming. They will also lose the sophisticated
> > > > optimizations
> > > > > > > such
> > > > > > > > as
> > > > > > > > >> codegen, aggregate join transpose  and multi-stage agg
> from
> > > > Flink
> > > > > > SQL.
> > > > > > > > >>
> > > > > > > > >> We believe that enhancing the functionality and
> productivity
> > > is
> > > > > > vital
> > > > > > > > for
> > > > > > > > >> the successful adoption of Table API. To this end,  Table
> > API
> > > > > still
> > > > > > > > >> requires more efforts from every contributor in the
> > community.
> > > > We
> > > > > > see
> > > > > > > > great
> > > > > > > > >> opportunity in improving our user’s experience from this
> > work.
> > > > Any
> > > > > > > > feedback
> > > > > > > > >> is welcome.
> > > > > > > > >>
> > > > > > > > >> Regards,
> > > > > > > > >>
> > > > > > > > >> Jincheng
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

SHI Xiaogang
Hi all,

I think it's good to enhance the functionality and productivity of Table
API, but still I think SQL + DataStream is a better choice from user
experience
1. The unification of batch and stream processing is very attractive, and
many our users are moving their batch-processing applications to Flink.
These users are familiar with SQL and most of their applications are
written in SQL-like languages. In Ad-hoc or interactive scenarios where
users write new queries according to the results or previous queries, SQL
is much more efficient than Table API.
2. Though TableAPI is also declarative, most users typically refuse to
learn another language, not to mention a fast-evolving one.
3. There are many efforts to be done in the optimization of both Table API
and SQL. Given that most data processing systems, including Databases,
MapReduce-like systems and Continuous Query systems, have done a lot of
work in the optimization of SQL queries, it seems more easier to adopt
existing work in Flink with SQL than TableAPI.
4. Data stream is still welcome as users can deal with complex logic with
flexible interfaces. It is also widely adopted in critical missions where
users can use a lot of "tricky" optimization to achieve optimal
performance. Most of these tricks typically are not allowed in TableAPI or
SQL.

Regards,
Xiaogang

Xiaowei Jiang <[hidden email]> 于2018年11月5日周一 下午8:34写道:

> Hi Fabian, these are great questions! I have some quick thoughts on some of
> these.
>
> Optimization opportunities: I think that you are right UDFs are more like
> blackboxes today. However this can change if we let user develop UDFs
> symbolically in the future (i.e., Flink will look inside the UDF code,
> understand it and potentially do codegen to execute it). This will open the
> door for potential optimizations.
>
> Moving batch functionality to DataStream: I actually think that moving
> batch functionality to Table API is probably a better idea. Currently,
> DataStream API is very deterministic and imperative. On the other hand,
> DataSet API has an optimizer and can choose very different execution plans.
> Another distinguishing feature of DataStream API is that users get direct
> access to state/statebackend which we intensionally avoided in Table API so
> far. The introduction of states is probably the biggest innovation by
> Flink. At the same time, abusing states may also be the largest source for
> reliability/performance issues. Shielding users away from dealing with
> state directly is a key advantage in Table API. I think this is probably a
> good boundary between DataStream and Table API. If users HAVE to manipulate
> state explicitly, go with DataStream, otherwise, go with Table API.
>
> Because Flink is extending into more and more scenarios (e.g., batch,
> streaming & micro-service), we may inevitably end up with multiple APIs. It
> appears that data analytics (batch & streaming) related applications can be
> well served with Table API which not only unifies batch and streaming, but
> also relieves the users from dealing with states explicitly. On the other
> hand, DataStream is very convenient and powerful for micro-service kind of
> applications because explicitly state access may be necessary in such
> cases.
>
> We will start a threading outlining what we are proposing in Table API.
>
> Regards,
> Xiaowei
>
> On Mon, Nov 5, 2018 at 7:03 PM Fabian Hueske <[hidden email]> wrote:
>
> > Hi Jincheng,
> >
> > Thanks for this interesting proposal.
> > I like that we can push this effort forward in a very fine-grained
> manner,
> > i.e., incrementally adding more APIs to the Table API.
> >
> > However, I also have a few questions / concerns.
> > Today, the Table API is tightly integrated with the DataSet and
> DataStream
> > APIs. It is very easy to convert a Table into a DataSet or DataStream and
> > vice versa. This mean it is already easy to combine custom logic an
> > relational operations. What I like is that several aspects are clearly
> > separated like retraction and timestamp handling (see below) + all
> > libraries on DataStream/DataSet can be easily combined with relational
> > operations.
> > I can see that adding more functionality to the Table API would remove
> the
> > distinction between DataSet and DataStream. However, wouldn't we get a
> > similar benefit by extending the DataStream API for proper support for
> > bounded streams (as is the long-term goal of Flink)?
> > I'm also a bit skeptical about the optimization opportunities we would
> > gain. Map/FlatMap UDFs are black boxes that cannot be easily removed
> > without additional information (I did some research on this a few years
> ago
> > [1]).
> >
> > Moreover, I think there are a few tricky details that need to be resolved
> > to enable a good integration.
> >
> > 1) How to deal with retraction messages? The DataStream API does not
> have a
> > notion of retractions. How would a MapFunction or FlatMapFunction handle
> > retraction? Do they need to be aware of the change flag? Custom windowing
> > and aggregation logic would certainly need to have that information.
> > 2) How to deal with timestamps? The DataStream API does not give access
> to
> > timestamps. In the Table API / SQL these are exposed as regular
> attributes.
> > How can we ensure that timestamp attributes remain valid (i.e. aligned
> with
> > watermarks) if the output is produced by arbitrary code?
> > There might be more issues of this kind.
> >
> > My main question would be how much would we gain with this proposal over
> a
> > tight integration of Table API and DataStream API, assuming that batch
> > functionality is moved to DataStream?
> >
> > Best, Fabian
> >
> > [1] http://stratosphere.eu/assets/papers/openingTheBlackBoxes_12.pdf
> >
> >
> > Am Mo., 5. Nov. 2018 um 02:49 Uhr schrieb Rong Rong <[hidden email]
> >:
> >
> > > Hi Jincheng,
> > >
> > > Thank you for the proposal! I think being able to define a process /
> > > co-process function in table API definitely opens up a whole new level
> of
> > > applications using a unified API.
> > >
> > > In addition, as Tzu-Li and Hequn have mentioned, the benefit of
> > > optimization layer of Table API will already bring in additional
> benefit
> > > over directly programming on top of DataStream/DataSet API. I am very
> > > interested an looking forward to seeing the support for more complex
> use
> > > cases, especially iterations. It will enable table API to define much
> > > broader, event-driven use cases such as real-time ML
> prediction/training.
> > >
> > > As Timo mentioned, This will make Table API diverge from the SQL API.
> But
> > > as from my experience Table API was always giving me the impression to
> > be a
> > > more sophisticated, syntactic-aware way to express relational
> operations.
> > > Looking forward to further discussion and collaborations on the FLIP
> doc.
> > >
> > > --
> > > Rong
> > >
> > > On Sun, Nov 4, 2018 at 5:22 PM jincheng sun <[hidden email]>
> > > wrote:
> > >
> > > > Hi tison,
> > > >
> > > > Thanks a lot for your feedback!
> > > > I am very happy to see that community contributors agree to enhanced
> > the
> > > > TableAPI. This work is a long-term continuous work, we will push it
> in
> > > > stages, we will soon complete  the enhanced list of the first phase,
> we
> > > can
> > > > go deep discussion  in google doc. thanks again for joining on the
> very
> > > > important discussion of the Flink Table API.
> > > >
> > > > Thanks,
> > > > Jincheng
> > > >
> > > > Tzu-Li Chen <[hidden email]> 于2018年11月2日周五 下午1:49写道:
> > > >
> > > > > Hi jingchengm
> > > > >
> > > > > Thanks a lot for your proposal! I find it is a good start point for
> > > > > internal optimization works and help Flink to be more
> > > > > user-friendly.
> > > > >
> > > > > AFAIK, DataStream is the most popular API currently that Flink
> > > > > users should describe their logic with detailed logic.
> > > > > From a more internal view the conversion from DataStream to
> > > > > JobGraph is quite mechanically and hard to be optimized. So when
> > > > > users program with DataStream, they have to learn more internals
> > > > > and spend a lot of time to tune for performance.
> > > > > With your proposal, we provide enhanced functionality of Table API,
> > > > > so that users can describe their job easily on Table aspect. This
> > gives
> > > > > an opportunity to Flink developers to introduce an optimize phase
> > > > > while transforming user program(described by Table API) to internal
> > > > > representation.
> > > > >
> > > > > Given a user who want to start using Flink with simple ETL,
> > pipelining
> > > > > or analytics, he would find it is most naturally described by
> > SQL/Table
> > > > > API. Further, as mentioned by @hequn,
> > > > >
> > > > > SQL is a widely used language. It follows standards, is a
> > > > > > descriptive language, and is easy to use
> > > > >
> > > > >
> > > > > thus we could expect with the enhancement of SQL/Table API, Flink
> > > > > becomes more friendly to users.
> > > > >
> > > > > Looking forward to the design doc/FLIP!
> > > > >
> > > > > Best,
> > > > > tison.
> > > > >
> > > > >
> > > > > jincheng sun <[hidden email]> 于2018年11月2日周五 上午11:46写道:
> > > > >
> > > > > > Hi Hequn,
> > > > > > Thanks for your feedback! And also thanks for our offline
> > discussion!
> > > > > > You are right, unification of batch and streaming is very
> important
> > > for
> > > > > > flink API.
> > > > > > We will provide more detailed design later, Please let me know if
> > you
> > > > > have
> > > > > > further thoughts or feedback.
> > > > > >
> > > > > > Thanks,
> > > > > > Jincheng
> > > > > >
> > > > > > Hequn Cheng <[hidden email]> 于2018年11月2日周五 上午10:02写道:
> > > > > >
> > > > > > > Hi Jincheng,
> > > > > > >
> > > > > > > Thanks a lot for your proposal. It is very encouraging!
> > > > > > >
> > > > > > > As we all know, SQL is a widely used language. It follows
> > > standards,
> > > > > is a
> > > > > > > descriptive language, and is easy to use. A powerful feature of
> > SQL
> > > > is
> > > > > > that
> > > > > > > it supports optimization. Users only need to care about the
> logic
> > > of
> > > > > the
> > > > > > > program. The underlying optimizer will help users optimize the
> > > > > > performance
> > > > > > > of the program. However, in terms of functionality and ease of
> > use,
> > > > in
> > > > > > some
> > > > > > > scenarios sql will be limited, as described in Jincheng's
> > proposal.
> > > > > > >
> > > > > > > Correspondingly, the DataStream/DataSet api can provide
> powerful
> > > > > > > functionalities. Users can write
> > ProcessFunction/CoProcessFunction
> > > > and
> > > > > > get
> > > > > > > the timer. Compared with SQL, it provides more functionalities
> > and
> > > > > > > flexibilities. However, it does not support optimization like
> > SQL.
> > > > > > > Meanwhile, DataStream/DataSet api has not been unified which
> > means,
> > > > for
> > > > > > the
> > > > > > > same logic, users need to write a job for each stream and
> batch.
> > > > > > >
> > > > > > > With TableApi, I think we can combine the advantages of both.
> > Users
> > > > can
> > > > > > > easily write relational operations and enjoy optimization. At
> the
> > > > same
> > > > > > > time, it supports more functionality and ease of use. Looking
> > > forward
> > > > > to
> > > > > > > the detailed design/FLIP.
> > > > > > >
> > > > > > > Best,
> > > > > > > Hequn
> > > > > > >
> > > > > > > On Fri, Nov 2, 2018 at 9:48 AM Shaoxuan Wang <
> > [hidden email]>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Aljoscha,
> > > > > > > > Glad that you like the proposal. We have completed the
> > prototype
> > > of
> > > > > > most
> > > > > > > > new proposed functionalities. Once collect the feedback from
> > > > > community,
> > > > > > > we
> > > > > > > > will come up with a concrete FLIP/design doc.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Shaoxuan
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Nov 1, 2018 at 8:12 PM Aljoscha Krettek <
> > > > [hidden email]
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jincheng,
> > > > > > > > >
> > > > > > > > > these points sound very good! Are there any concrete
> > proposals
> > > > for
> > > > > > > > > changes? For example a FLIP/design document?
> > > > > > > > >
> > > > > > > > > See here for FLIPs:
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Aljoscha
> > > > > > > > >
> > > > > > > > > > On 1. Nov 2018, at 12:51, jincheng sun <
> > > > [hidden email]
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > *--------I am sorry for the formatting of the email
> > content.
> > > I
> > > > > > > reformat
> > > > > > > > > > the **content** as follows-----------*
> > > > > > > > > >
> > > > > > > > > > *Hi ALL,*
> > > > > > > > > >
> > > > > > > > > > With the continuous efforts from the community, the Flink
> > > > system
> > > > > > has
> > > > > > > > been
> > > > > > > > > > continuously improved, which has attracted more and more
> > > users.
> > > > > > Flink
> > > > > > > > SQL
> > > > > > > > > > is a canonical, widely used relational query language.
> > > However,
> > > > > > there
> > > > > > > > are
> > > > > > > > > > still some scenarios where Flink SQL failed to meet user
> > > needs
> > > > in
> > > > > > > terms
> > > > > > > > > of
> > > > > > > > > > functionality and ease of use, such as:
> > > > > > > > > >
> > > > > > > > > > *1. In terms of functionality*
> > > > > > > > > >    Iteration, user-defined window, user-defined join,
> > > > > user-defined
> > > > > > > > > > GroupReduce, etc. Users cannot express them with SQL;
> > > > > > > > > >
> > > > > > > > > > *2. In terms of ease of use*
> > > > > > > > > >
> > > > > > > > > >   - Map - e.g. “dataStream.map(mapFun)”. Although
> > > > > > > “table.select(udf1(),
> > > > > > > > > >   udf2(), udf3()....)” can be used to accomplish the same
> > > > > > function.,
> > > > > > > > > with a
> > > > > > > > > >   map() function returning 100 columns, one has to define
> > or
> > > > call
> > > > > > 100
> > > > > > > > > UDFs
> > > > > > > > > >   when using SQL, which is quite involved.
> > > > > > > > > >   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”.
> > > Similarly,
> > > > > it
> > > > > > > can
> > > > > > > > be
> > > > > > > > > >   implemented with “table.join(udtf).select()”. However,
> it
> > > is
> > > > > > > obvious
> > > > > > > > > that
> > > > > > > > > >   dataStream is easier to use than SQL.
> > > > > > > > > >
> > > > > > > > > > Due to the above two reasons, some users have to use the
> > > > > DataStream
> > > > > > > API
> > > > > > > > > or
> > > > > > > > > > the DataSet API. But when they do that, they lose the
> > > > unification
> > > > > > of
> > > > > > > > > batch
> > > > > > > > > > and streaming. They will also lose the sophisticated
> > > > > optimizations
> > > > > > > such
> > > > > > > > > as
> > > > > > > > > > codegen, aggregate join transpose and multi-stage agg
> from
> > > > Flink
> > > > > > SQL.
> > > > > > > > > >
> > > > > > > > > > We believe that enhancing the functionality and
> > productivity
> > > is
> > > > > > vital
> > > > > > > > for
> > > > > > > > > > the successful adoption of Table API. To this end,  Table
> > API
> > > > > still
> > > > > > > > > > requires more efforts from every contributor in the
> > > community.
> > > > We
> > > > > > see
> > > > > > > > > great
> > > > > > > > > > opportunity in improving our user’s experience from this
> > > work.
> > > > > Any
> > > > > > > > > feedback
> > > > > > > > > > is welcome.
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > >
> > > > > > > > > > Jincheng
> > > > > > > > > >
> > > > > > > > > > jincheng sun <[hidden email]> 于2018年11月1日周四
> > > > 下午5:07写道:
> > > > > > > > > >
> > > > > > > > > >> Hi all,
> > > > > > > > > >>
> > > > > > > > > >> With the continuous efforts from the community, the
> Flink
> > > > system
> > > > > > has
> > > > > > > > > been
> > > > > > > > > >> continuously improved, which has attracted more and more
> > > > users.
> > > > > > > Flink
> > > > > > > > > SQL
> > > > > > > > > >> is a canonical, widely used relational query language.
> > > > However,
> > > > > > > there
> > > > > > > > > are
> > > > > > > > > >> still some scenarios where Flink SQL failed to meet user
> > > needs
> > > > > in
> > > > > > > > terms
> > > > > > > > > of
> > > > > > > > > >> functionality and ease of use, such as:
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >>   -
> > > > > > > > > >>
> > > > > > > > > >>   In terms of functionality
> > > > > > > > > >>
> > > > > > > > > >> Iteration, user-defined window, user-defined join,
> > > > user-defined
> > > > > > > > > >> GroupReduce, etc. Users cannot express them with SQL;
> > > > > > > > > >>
> > > > > > > > > >>   -
> > > > > > > > > >>
> > > > > > > > > >>   In terms of ease of use
> > > > > > > > > >>   -
> > > > > > > > > >>
> > > > > > > > > >>      Map - e.g. “dataStream.map(mapFun)”. Although
> > > > > > > > “table.select(udf1(),
> > > > > > > > > >>      udf2(), udf3()....)” can be used to accomplish the
> > same
> > > > > > > > function.,
> > > > > > > > > with a
> > > > > > > > > >>      map() function returning 100 columns, one has to
> > define
> > > > or
> > > > > > call
> > > > > > > > > 100 UDFs
> > > > > > > > > >>      when using SQL, which is quite involved.
> > > > > > > > > >>      -
> > > > > > > > > >>
> > > > > > > > > >>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”.
> > > > Similarly,
> > > > > > it
> > > > > > > > can
> > > > > > > > > >>      be implemented with “table.join(udtf).select()”.
> > > However,
> > > > > it
> > > > > > is
> > > > > > > > > obvious
> > > > > > > > > >>      that datastream is easier to use than SQL.
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> Due to the above two reasons, some users have to use the
> > > > > > DataStream
> > > > > > > > API
> > > > > > > > > or
> > > > > > > > > >> the DataSet API. But when they do that, they lose the
> > > > > unification
> > > > > > of
> > > > > > > > > batch
> > > > > > > > > >> and streaming. They will also lose the sophisticated
> > > > > optimizations
> > > > > > > > such
> > > > > > > > > as
> > > > > > > > > >> codegen, aggregate join transpose  and multi-stage agg
> > from
> > > > > Flink
> > > > > > > SQL.
> > > > > > > > > >>
> > > > > > > > > >> We believe that enhancing the functionality and
> > productivity
> > > > is
> > > > > > > vital
> > > > > > > > > for
> > > > > > > > > >> the successful adoption of Table API. To this end,
> Table
> > > API
> > > > > > still
> > > > > > > > > >> requires more efforts from every contributor in the
> > > community.
> > > > > We
> > > > > > > see
> > > > > > > > > great
> > > > > > > > > >> opportunity in improving our user’s experience from this
> > > work.
> > > > > Any
> > > > > > > > > feedback
> > > > > > > > > >> is welcome.
> > > > > > > > > >>
> > > > > > > > > >> Regards,
> > > > > > > > > >>
> > > > > > > > > >> Jincheng
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

jincheng sun
Hi Xiaogang,

Thanks for your feedback, I will share my thoughts here:

First, enhancing TableAPI does not mean weakening SQL. We also need to
enhance the functionality of SQL, such as @Xuefu's ongoing integration of
the hive SQL ecosystem.

In addition,SQL and TableAPI are two different API forms of the Apache
Flink high-level APIs, and users can choose according to their own
preferences and habits.

About #3) SQL and TableAPI using the same optimization framework, So both
SQL and TableAPI can reused the  optimization rules from traditional
database.

Regarding #4) You are right, whether we want to enhancing the TableAPI or
not, we have to retain the DataStreamAPI, because DataStream provides users
with enough control.

But FLink SQL follow the ANSI-SQL standard is worthy of our insistence, and
ANSI-SQL is difficult to express the convenient operation of map/flatmap.
So we want to add the map/flatmap on TableAPI, to enhance the ease of use
of TableAPI and optimize the Flink user experience, what do you think?

Thanks,
Jincheng

SHI Xiaogang <[hidden email]> 于2018年11月6日周二 上午11:28写道:

> Hi all,
>
> I think it's good to enhance the functionality and productivity of Table
> API, but still I think SQL + DataStream is a better choice from user
> experience
> 1. The unification of batch and stream processing is very attractive, and
> many our users are moving their batch-processing applications to Flink.
> These users are familiar with SQL and most of their applications are
> written in SQL-like languages. In Ad-hoc or interactive scenarios where
> users write new queries according to the results or previous queries, SQL
> is much more efficient than Table API.
> 2. Though TableAPI is also declarative, most users typically refuse to
> learn another language, not to mention a fast-evolving one.
> 3. There are many efforts to be done in the optimization of both Table API
> and SQL. Given that most data processing systems, including Databases,
> MapReduce-like systems and Continuous Query systems, have done a lot of
> work in the optimization of SQL queries, it seems more easier to adopt
> existing work in Flink with SQL than TableAPI.
> 4. Data stream is still welcome as users can deal with complex logic with
> flexible interfaces. It is also widely adopted in critical missions where
> users can use a lot of "tricky" optimization to achieve optimal
> performance. Most of these tricks typically are not allowed in TableAPI or
> SQL.
>
> Regards,
> Xiaogang
>
> Xiaowei Jiang <[hidden email]> 于2018年11月5日周一 下午8:34写道:
>
> > Hi Fabian, these are great questions! I have some quick thoughts on some
> of
> > these.
> >
> > Optimization opportunities: I think that you are right UDFs are more like
> > blackboxes today. However this can change if we let user develop UDFs
> > symbolically in the future (i.e., Flink will look inside the UDF code,
> > understand it and potentially do codegen to execute it). This will open
> the
> > door for potential optimizations.
> >
> > Moving batch functionality to DataStream: I actually think that moving
> > batch functionality to Table API is probably a better idea. Currently,
> > DataStream API is very deterministic and imperative. On the other hand,
> > DataSet API has an optimizer and can choose very different execution
> plans.
> > Another distinguishing feature of DataStream API is that users get direct
> > access to state/statebackend which we intensionally avoided in Table API
> so
> > far. The introduction of states is probably the biggest innovation by
> > Flink. At the same time, abusing states may also be the largest source
> for
> > reliability/performance issues. Shielding users away from dealing with
> > state directly is a key advantage in Table API. I think this is probably
> a
> > good boundary between DataStream and Table API. If users HAVE to
> manipulate
> > state explicitly, go with DataStream, otherwise, go with Table API.
> >
> > Because Flink is extending into more and more scenarios (e.g., batch,
> > streaming & micro-service), we may inevitably end up with multiple APIs.
> It
> > appears that data analytics (batch & streaming) related applications can
> be
> > well served with Table API which not only unifies batch and streaming,
> but
> > also relieves the users from dealing with states explicitly. On the other
> > hand, DataStream is very convenient and powerful for micro-service kind
> of
> > applications because explicitly state access may be necessary in such
> > cases.
> >
> > We will start a threading outlining what we are proposing in Table API.
> >
> > Regards,
> > Xiaowei
> >
> > On Mon, Nov 5, 2018 at 7:03 PM Fabian Hueske <[hidden email]> wrote:
> >
> > > Hi Jincheng,
> > >
> > > Thanks for this interesting proposal.
> > > I like that we can push this effort forward in a very fine-grained
> > manner,
> > > i.e., incrementally adding more APIs to the Table API.
> > >
> > > However, I also have a few questions / concerns.
> > > Today, the Table API is tightly integrated with the DataSet and
> > DataStream
> > > APIs. It is very easy to convert a Table into a DataSet or DataStream
> and
> > > vice versa. This mean it is already easy to combine custom logic an
> > > relational operations. What I like is that several aspects are clearly
> > > separated like retraction and timestamp handling (see below) + all
> > > libraries on DataStream/DataSet can be easily combined with relational
> > > operations.
> > > I can see that adding more functionality to the Table API would remove
> > the
> > > distinction between DataSet and DataStream. However, wouldn't we get a
> > > similar benefit by extending the DataStream API for proper support for
> > > bounded streams (as is the long-term goal of Flink)?
> > > I'm also a bit skeptical about the optimization opportunities we would
> > > gain. Map/FlatMap UDFs are black boxes that cannot be easily removed
> > > without additional information (I did some research on this a few years
> > ago
> > > [1]).
> > >
> > > Moreover, I think there are a few tricky details that need to be
> resolved
> > > to enable a good integration.
> > >
> > > 1) How to deal with retraction messages? The DataStream API does not
> > have a
> > > notion of retractions. How would a MapFunction or FlatMapFunction
> handle
> > > retraction? Do they need to be aware of the change flag? Custom
> windowing
> > > and aggregation logic would certainly need to have that information.
> > > 2) How to deal with timestamps? The DataStream API does not give access
> > to
> > > timestamps. In the Table API / SQL these are exposed as regular
> > attributes.
> > > How can we ensure that timestamp attributes remain valid (i.e. aligned
> > with
> > > watermarks) if the output is produced by arbitrary code?
> > > There might be more issues of this kind.
> > >
> > > My main question would be how much would we gain with this proposal
> over
> > a
> > > tight integration of Table API and DataStream API, assuming that batch
> > > functionality is moved to DataStream?
> > >
> > > Best, Fabian
> > >
> > > [1] http://stratosphere.eu/assets/papers/openingTheBlackBoxes_12.pdf
> > >
> > >
> > > Am Mo., 5. Nov. 2018 um 02:49 Uhr schrieb Rong Rong <
> [hidden email]
> > >:
> > >
> > > > Hi Jincheng,
> > > >
> > > > Thank you for the proposal! I think being able to define a process /
> > > > co-process function in table API definitely opens up a whole new
> level
> > of
> > > > applications using a unified API.
> > > >
> > > > In addition, as Tzu-Li and Hequn have mentioned, the benefit of
> > > > optimization layer of Table API will already bring in additional
> > benefit
> > > > over directly programming on top of DataStream/DataSet API. I am very
> > > > interested an looking forward to seeing the support for more complex
> > use
> > > > cases, especially iterations. It will enable table API to define much
> > > > broader, event-driven use cases such as real-time ML
> > prediction/training.
> > > >
> > > > As Timo mentioned, This will make Table API diverge from the SQL API.
> > But
> > > > as from my experience Table API was always giving me the impression
> to
> > > be a
> > > > more sophisticated, syntactic-aware way to express relational
> > operations.
> > > > Looking forward to further discussion and collaborations on the FLIP
> > doc.
> > > >
> > > > --
> > > > Rong
> > > >
> > > > On Sun, Nov 4, 2018 at 5:22 PM jincheng sun <
> [hidden email]>
> > > > wrote:
> > > >
> > > > > Hi tison,
> > > > >
> > > > > Thanks a lot for your feedback!
> > > > > I am very happy to see that community contributors agree to
> enhanced
> > > the
> > > > > TableAPI. This work is a long-term continuous work, we will push it
> > in
> > > > > stages, we will soon complete  the enhanced list of the first
> phase,
> > we
> > > > can
> > > > > go deep discussion  in google doc. thanks again for joining on the
> > very
> > > > > important discussion of the Flink Table API.
> > > > >
> > > > > Thanks,
> > > > > Jincheng
> > > > >
> > > > > Tzu-Li Chen <[hidden email]> 于2018年11月2日周五 下午1:49写道:
> > > > >
> > > > > > Hi jingchengm
> > > > > >
> > > > > > Thanks a lot for your proposal! I find it is a good start point
> for
> > > > > > internal optimization works and help Flink to be more
> > > > > > user-friendly.
> > > > > >
> > > > > > AFAIK, DataStream is the most popular API currently that Flink
> > > > > > users should describe their logic with detailed logic.
> > > > > > From a more internal view the conversion from DataStream to
> > > > > > JobGraph is quite mechanically and hard to be optimized. So when
> > > > > > users program with DataStream, they have to learn more internals
> > > > > > and spend a lot of time to tune for performance.
> > > > > > With your proposal, we provide enhanced functionality of Table
> API,
> > > > > > so that users can describe their job easily on Table aspect. This
> > > gives
> > > > > > an opportunity to Flink developers to introduce an optimize phase
> > > > > > while transforming user program(described by Table API) to
> internal
> > > > > > representation.
> > > > > >
> > > > > > Given a user who want to start using Flink with simple ETL,
> > > pipelining
> > > > > > or analytics, he would find it is most naturally described by
> > > SQL/Table
> > > > > > API. Further, as mentioned by @hequn,
> > > > > >
> > > > > > SQL is a widely used language. It follows standards, is a
> > > > > > > descriptive language, and is easy to use
> > > > > >
> > > > > >
> > > > > > thus we could expect with the enhancement of SQL/Table API, Flink
> > > > > > becomes more friendly to users.
> > > > > >
> > > > > > Looking forward to the design doc/FLIP!
> > > > > >
> > > > > > Best,
> > > > > > tison.
> > > > > >
> > > > > >
> > > > > > jincheng sun <[hidden email]> 于2018年11月2日周五 上午11:46写道:
> > > > > >
> > > > > > > Hi Hequn,
> > > > > > > Thanks for your feedback! And also thanks for our offline
> > > discussion!
> > > > > > > You are right, unification of batch and streaming is very
> > important
> > > > for
> > > > > > > flink API.
> > > > > > > We will provide more detailed design later, Please let me know
> if
> > > you
> > > > > > have
> > > > > > > further thoughts or feedback.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Jincheng
> > > > > > >
> > > > > > > Hequn Cheng <[hidden email]> 于2018年11月2日周五 上午10:02写道:
> > > > > > >
> > > > > > > > Hi Jincheng,
> > > > > > > >
> > > > > > > > Thanks a lot for your proposal. It is very encouraging!
> > > > > > > >
> > > > > > > > As we all know, SQL is a widely used language. It follows
> > > > standards,
> > > > > > is a
> > > > > > > > descriptive language, and is easy to use. A powerful feature
> of
> > > SQL
> > > > > is
> > > > > > > that
> > > > > > > > it supports optimization. Users only need to care about the
> > logic
> > > > of
> > > > > > the
> > > > > > > > program. The underlying optimizer will help users optimize
> the
> > > > > > > performance
> > > > > > > > of the program. However, in terms of functionality and ease
> of
> > > use,
> > > > > in
> > > > > > > some
> > > > > > > > scenarios sql will be limited, as described in Jincheng's
> > > proposal.
> > > > > > > >
> > > > > > > > Correspondingly, the DataStream/DataSet api can provide
> > powerful
> > > > > > > > functionalities. Users can write
> > > ProcessFunction/CoProcessFunction
> > > > > and
> > > > > > > get
> > > > > > > > the timer. Compared with SQL, it provides more
> functionalities
> > > and
> > > > > > > > flexibilities. However, it does not support optimization like
> > > SQL.
> > > > > > > > Meanwhile, DataStream/DataSet api has not been unified which
> > > means,
> > > > > for
> > > > > > > the
> > > > > > > > same logic, users need to write a job for each stream and
> > batch.
> > > > > > > >
> > > > > > > > With TableApi, I think we can combine the advantages of both.
> > > Users
> > > > > can
> > > > > > > > easily write relational operations and enjoy optimization. At
> > the
> > > > > same
> > > > > > > > time, it supports more functionality and ease of use. Looking
> > > > forward
> > > > > > to
> > > > > > > > the detailed design/FLIP.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Hequn
> > > > > > > >
> > > > > > > > On Fri, Nov 2, 2018 at 9:48 AM Shaoxuan Wang <
> > > [hidden email]>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Aljoscha,
> > > > > > > > > Glad that you like the proposal. We have completed the
> > > prototype
> > > > of
> > > > > > > most
> > > > > > > > > new proposed functionalities. Once collect the feedback
> from
> > > > > > community,
> > > > > > > > we
> > > > > > > > > will come up with a concrete FLIP/design doc.
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Shaoxuan
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Nov 1, 2018 at 8:12 PM Aljoscha Krettek <
> > > > > [hidden email]
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jincheng,
> > > > > > > > > >
> > > > > > > > > > these points sound very good! Are there any concrete
> > > proposals
> > > > > for
> > > > > > > > > > changes? For example a FLIP/design document?
> > > > > > > > > >
> > > > > > > > > > See here for FLIPs:
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Aljoscha
> > > > > > > > > >
> > > > > > > > > > > On 1. Nov 2018, at 12:51, jincheng sun <
> > > > > [hidden email]
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > *--------I am sorry for the formatting of the email
> > > content.
> > > > I
> > > > > > > > reformat
> > > > > > > > > > > the **content** as follows-----------*
> > > > > > > > > > >
> > > > > > > > > > > *Hi ALL,*
> > > > > > > > > > >
> > > > > > > > > > > With the continuous efforts from the community, the
> Flink
> > > > > system
> > > > > > > has
> > > > > > > > > been
> > > > > > > > > > > continuously improved, which has attracted more and
> more
> > > > users.
> > > > > > > Flink
> > > > > > > > > SQL
> > > > > > > > > > > is a canonical, widely used relational query language.
> > > > However,
> > > > > > > there
> > > > > > > > > are
> > > > > > > > > > > still some scenarios where Flink SQL failed to meet
> user
> > > > needs
> > > > > in
> > > > > > > > terms
> > > > > > > > > > of
> > > > > > > > > > > functionality and ease of use, such as:
> > > > > > > > > > >
> > > > > > > > > > > *1. In terms of functionality*
> > > > > > > > > > >    Iteration, user-defined window, user-defined join,
> > > > > > user-defined
> > > > > > > > > > > GroupReduce, etc. Users cannot express them with SQL;
> > > > > > > > > > >
> > > > > > > > > > > *2. In terms of ease of use*
> > > > > > > > > > >
> > > > > > > > > > >   - Map - e.g. “dataStream.map(mapFun)”. Although
> > > > > > > > “table.select(udf1(),
> > > > > > > > > > >   udf2(), udf3()....)” can be used to accomplish the
> same
> > > > > > > function.,
> > > > > > > > > > with a
> > > > > > > > > > >   map() function returning 100 columns, one has to
> define
> > > or
> > > > > call
> > > > > > > 100
> > > > > > > > > > UDFs
> > > > > > > > > > >   when using SQL, which is quite involved.
> > > > > > > > > > >   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”.
> > > > Similarly,
> > > > > > it
> > > > > > > > can
> > > > > > > > > be
> > > > > > > > > > >   implemented with “table.join(udtf).select()”.
> However,
> > it
> > > > is
> > > > > > > > obvious
> > > > > > > > > > that
> > > > > > > > > > >   dataStream is easier to use than SQL.
> > > > > > > > > > >
> > > > > > > > > > > Due to the above two reasons, some users have to use
> the
> > > > > > DataStream
> > > > > > > > API
> > > > > > > > > > or
> > > > > > > > > > > the DataSet API. But when they do that, they lose the
> > > > > unification
> > > > > > > of
> > > > > > > > > > batch
> > > > > > > > > > > and streaming. They will also lose the sophisticated
> > > > > > optimizations
> > > > > > > > such
> > > > > > > > > > as
> > > > > > > > > > > codegen, aggregate join transpose and multi-stage agg
> > from
> > > > > Flink
> > > > > > > SQL.
> > > > > > > > > > >
> > > > > > > > > > > We believe that enhancing the functionality and
> > > productivity
> > > > is
> > > > > > > vital
> > > > > > > > > for
> > > > > > > > > > > the successful adoption of Table API. To this end,
> Table
> > > API
> > > > > > still
> > > > > > > > > > > requires more efforts from every contributor in the
> > > > community.
> > > > > We
> > > > > > > see
> > > > > > > > > > great
> > > > > > > > > > > opportunity in improving our user’s experience from
> this
> > > > work.
> > > > > > Any
> > > > > > > > > > feedback
> > > > > > > > > > > is welcome.
> > > > > > > > > > >
> > > > > > > > > > > Regards,
> > > > > > > > > > >
> > > > > > > > > > > Jincheng
> > > > > > > > > > >
> > > > > > > > > > > jincheng sun <[hidden email]> 于2018年11月1日周四
> > > > > 下午5:07写道:
> > > > > > > > > > >
> > > > > > > > > > >> Hi all,
> > > > > > > > > > >>
> > > > > > > > > > >> With the continuous efforts from the community, the
> > Flink
> > > > > system
> > > > > > > has
> > > > > > > > > > been
> > > > > > > > > > >> continuously improved, which has attracted more and
> more
> > > > > users.
> > > > > > > > Flink
> > > > > > > > > > SQL
> > > > > > > > > > >> is a canonical, widely used relational query language.
> > > > > However,
> > > > > > > > there
> > > > > > > > > > are
> > > > > > > > > > >> still some scenarios where Flink SQL failed to meet
> user
> > > > needs
> > > > > > in
> > > > > > > > > terms
> > > > > > > > > > of
> > > > > > > > > > >> functionality and ease of use, such as:
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >>   -
> > > > > > > > > > >>
> > > > > > > > > > >>   In terms of functionality
> > > > > > > > > > >>
> > > > > > > > > > >> Iteration, user-defined window, user-defined join,
> > > > > user-defined
> > > > > > > > > > >> GroupReduce, etc. Users cannot express them with SQL;
> > > > > > > > > > >>
> > > > > > > > > > >>   -
> > > > > > > > > > >>
> > > > > > > > > > >>   In terms of ease of use
> > > > > > > > > > >>   -
> > > > > > > > > > >>
> > > > > > > > > > >>      Map - e.g. “dataStream.map(mapFun)”. Although
> > > > > > > > > “table.select(udf1(),
> > > > > > > > > > >>      udf2(), udf3()....)” can be used to accomplish
> the
> > > same
> > > > > > > > > function.,
> > > > > > > > > > with a
> > > > > > > > > > >>      map() function returning 100 columns, one has to
> > > define
> > > > > or
> > > > > > > call
> > > > > > > > > > 100 UDFs
> > > > > > > > > > >>      when using SQL, which is quite involved.
> > > > > > > > > > >>      -
> > > > > > > > > > >>
> > > > > > > > > > >>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”.
> > > > > Similarly,
> > > > > > > it
> > > > > > > > > can
> > > > > > > > > > >>      be implemented with “table.join(udtf).select()”.
> > > > However,
> > > > > > it
> > > > > > > is
> > > > > > > > > > obvious
> > > > > > > > > > >>      that datastream is easier to use than SQL.
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> Due to the above two reasons, some users have to use
> the
> > > > > > > DataStream
> > > > > > > > > API
> > > > > > > > > > or
> > > > > > > > > > >> the DataSet API. But when they do that, they lose the
> > > > > > unification
> > > > > > > of
> > > > > > > > > > batch
> > > > > > > > > > >> and streaming. They will also lose the sophisticated
> > > > > > optimizations
> > > > > > > > > such
> > > > > > > > > > as
> > > > > > > > > > >> codegen, aggregate join transpose  and multi-stage agg
> > > from
> > > > > > Flink
> > > > > > > > SQL.
> > > > > > > > > > >>
> > > > > > > > > > >> We believe that enhancing the functionality and
> > > productivity
> > > > > is
> > > > > > > > vital
> > > > > > > > > > for
> > > > > > > > > > >> the successful adoption of Table API. To this end,
> > Table
> > > > API
> > > > > > > still
> > > > > > > > > > >> requires more efforts from every contributor in the
> > > > community.
> > > > > > We
> > > > > > > > see
> > > > > > > > > > great
> > > > > > > > > > >> opportunity in improving our user’s experience from
> this
> > > > work.
> > > > > > Any
> > > > > > > > > > feedback
> > > > > > > > > > >> is welcome.
> > > > > > > > > > >>
> > > > > > > > > > >> Regards,
> > > > > > > > > > >>
> > > > > > > > > > >> Jincheng
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhancing the functionality and productivity of Table API

Becket Qin
In reply to this post by SHI Xiaogang
Hi Xinggang,

Thanks for the comments. Please see the responses inline below.

On Tue, Nov 6, 2018 at 11:28 AM SHI Xiaogang <[hidden email]> wrote:

> Hi all,
>
> I think it's good to enhance the functionality and productivity of Table
> API, but still I think SQL + DataStream is a better choice from user
> experience
> 1. The unification of batch and stream processing is very attractive, and
> many our users are moving their batch-processing applications to Flink.
> These users are familiar with SQL and most of their applications are
> written in SQL-like languages. In Ad-hoc or interactive scenarios where
> users write new queries according to the results or previous queries, SQL
> is much more efficient than Table API.
>
As of now, both SQL and Table API support batch and stream at the same
time. However, the scopes of them are a little different. SQL is a query
language which perfectly fits the scenarios such as OLAP/BI. However, it
falls in short when it comes to cases like ML/Graph processing where a
programming interface is needed. As you may probably noticed, some
fundamental features such as iteration is missing from SQL. Besides that,
people rarely write algorithms in SQL. Due to the same argument you have in
(2), users want to see a programming interface instead of a query language.
As Xiaowei mentioned, we are extending Table API to wider range of use
cases. Hence, enhancing table API would be necessary to that goal.

> 2. Though TableAPI is also declarative, most users typically refuse to
> learn another language, not to mention a fast-evolving one.
> 3. There are many efforts to be done in the optimization of both Table API
> and SQL. Given that most data processing systems, including Databases,
> MapReduce-like systems and Continuous Query systems, have done a lot of
> work in the optimization of SQL queries, it seems more easier to adopt
> existing work in Flink with SQL than TableAPI.
>
WRT optimization, at the end of the day, SQL and Table API in fact share
the exact same QO path. Therefore, any optimization made to SQL will be
available to Table API. There is no additional overhead. The additional
optimization opportunity Xiaowei mentioned was specifically for UDFs which
are blackboxes today and may be optimized with symbolic expressions.

> 4. Data stream is still welcome as users can deal with complex logic with
> flexible interfaces. It is also widely adopted in critical missions where
> users can use a lot of "tricky" optimization to achieve optimal
> performance. Most of these tricks typically are not allowed in TableAPI or
> SQL.

Completely agree that DataStream is important to many advanced cases. We
should also invest in that to make it stronger. Enhancing Table API does
not mean getting rid of DataStreams. Table API is more of a high level API
with which users just need to focus on their business logic, while
DataStream API is more of a low level API for users who want to have a
strong control over all the execution details. Both of them are valuable to
different users.

>


> Regards,
> Xiaogang
>
> Xiaowei Jiang <[hidden email]> 于2018年11月5日周一 下午8:34写道:
>
> > Hi Fabian, these are great questions! I have some quick thoughts on some
> of
> > these.
> >
> > Optimization opportunities: I think that you are right UDFs are more like
> > blackboxes today. However this can change if we let user develop UDFs
> > symbolically in the future (i.e., Flink will look inside the UDF code,
> > understand it and potentially do codegen to execute it). This will open
> the
> > door for potential optimizations.
> >
> > Moving batch functionality to DataStream: I actually think that moving
> > batch functionality to Table API is probably a better idea. Currently,
> > DataStream API is very deterministic and imperative. On the other hand,
> > DataSet API has an optimizer and can choose very different execution
> plans.
> > Another distinguishing feature of DataStream API is that users get direct
> > access to state/statebackend which we intensionally avoided in Table API
> so
> > far. The introduction of states is probably the biggest innovation by
> > Flink. At the same time, abusing states may also be the largest source
> for
> > reliability/performance issues. Shielding users away from dealing with
> > state directly is a key advantage in Table API. I think this is probably
> a
> > good boundary between DataStream and Table API. If users HAVE to
> manipulate
> > state explicitly, go with DataStream, otherwise, go with Table API.
> >
> > Because Flink is extending into more and more scenarios (e.g., batch,
> > streaming & micro-service), we may inevitably end up with multiple APIs.
> It
> > appears that data analytics (batch & streaming) related applications can
> be
> > well served with Table API which not only unifies batch and streaming,
> but
> > also relieves the users from dealing with states explicitly. On the other
> > hand, DataStream is very convenient and powerful for micro-service kind
> of
> > applications because explicitly state access may be necessary in such
> > cases.
> >
> > We will start a threading outlining what we are proposing in Table API.
> >
> > Regards,
> > Xiaowei
> >
> > On Mon, Nov 5, 2018 at 7:03 PM Fabian Hueske <[hidden email]> wrote:
> >
> > > Hi Jincheng,
> > >
> > > Thanks for this interesting proposal.
> > > I like that we can push this effort forward in a very fine-grained
> > manner,
> > > i.e., incrementally adding more APIs to the Table API.
> > >
> > > However, I also have a few questions / concerns.
> > > Today, the Table API is tightly integrated with the DataSet and
> > DataStream
> > > APIs. It is very easy to convert a Table into a DataSet or DataStream
> and
> > > vice versa. This mean it is already easy to combine custom logic an
> > > relational operations. What I like is that several aspects are clearly
> > > separated like retraction and timestamp handling (see below) + all
> > > libraries on DataStream/DataSet can be easily combined with relational
> > > operations.
> > > I can see that adding more functionality to the Table API would remove
> > the
> > > distinction between DataSet and DataStream. However, wouldn't we get a
> > > similar benefit by extending the DataStream API for proper support for
> > > bounded streams (as is the long-term goal of Flink)?
> > > I'm also a bit skeptical about the optimization opportunities we would
> > > gain. Map/FlatMap UDFs are black boxes that cannot be easily removed
> > > without additional information (I did some research on this a few years
> > ago
> > > [1]).
> > >
> > > Moreover, I think there are a few tricky details that need to be
> resolved
> > > to enable a good integration.
> > >
> > > 1) How to deal with retraction messages? The DataStream API does not
> > have a
> > > notion of retractions. How would a MapFunction or FlatMapFunction
> handle
> > > retraction? Do they need to be aware of the change flag? Custom
> windowing
> > > and aggregation logic would certainly need to have that information.
> > > 2) How to deal with timestamps? The DataStream API does not give access
> > to
> > > timestamps. In the Table API / SQL these are exposed as regular
> > attributes.
> > > How can we ensure that timestamp attributes remain valid (i.e. aligned
> > with
> > > watermarks) if the output is produced by arbitrary code?
> > > There might be more issues of this kind.
> > >
> > > My main question would be how much would we gain with this proposal
> over
> > a
> > > tight integration of Table API and DataStream API, assuming that batch
> > > functionality is moved to DataStream?
> > >
> > > Best, Fabian
> > >
> > > [1] http://stratosphere.eu/assets/papers/openingTheBlackBoxes_12.pdf
> > >
> > >
> > > Am Mo., 5. Nov. 2018 um 02:49 Uhr schrieb Rong Rong <
> [hidden email]
> > >:
> > >
> > > > Hi Jincheng,
> > > >
> > > > Thank you for the proposal! I think being able to define a process /
> > > > co-process function in table API definitely opens up a whole new
> level
> > of
> > > > applications using a unified API.
> > > >
> > > > In addition, as Tzu-Li and Hequn have mentioned, the benefit of
> > > > optimization layer of Table API will already bring in additional
> > benefit
> > > > over directly programming on top of DataStream/DataSet API. I am very
> > > > interested an looking forward to seeing the support for more complex
> > use
> > > > cases, especially iterations. It will enable table API to define much
> > > > broader, event-driven use cases such as real-time ML
> > prediction/training.
> > > >
> > > > As Timo mentioned, This will make Table API diverge from the SQL API.
> > But
> > > > as from my experience Table API was always giving me the impression
> to
> > > be a
> > > > more sophisticated, syntactic-aware way to express relational
> > operations.
> > > > Looking forward to further discussion and collaborations on the FLIP
> > doc.
> > > >
> > > > --
> > > > Rong
> > > >
> > > > On Sun, Nov 4, 2018 at 5:22 PM jincheng sun <
> [hidden email]>
> > > > wrote:
> > > >
> > > > > Hi tison,
> > > > >
> > > > > Thanks a lot for your feedback!
> > > > > I am very happy to see that community contributors agree to
> enhanced
> > > the
> > > > > TableAPI. This work is a long-term continuous work, we will push it
> > in
> > > > > stages, we will soon complete  the enhanced list of the first
> phase,
> > we
> > > > can
> > > > > go deep discussion  in google doc. thanks again for joining on the
> > very
> > > > > important discussion of the Flink Table API.
> > > > >
> > > > > Thanks,
> > > > > Jincheng
> > > > >
> > > > > Tzu-Li Chen <[hidden email]> 于2018年11月2日周五 下午1:49写道:
> > > > >
> > > > > > Hi jingchengm
> > > > > >
> > > > > > Thanks a lot for your proposal! I find it is a good start point
> for
> > > > > > internal optimization works and help Flink to be more
> > > > > > user-friendly.
> > > > > >
> > > > > > AFAIK, DataStream is the most popular API currently that Flink
> > > > > > users should describe their logic with detailed logic.
> > > > > > From a more internal view the conversion from DataStream to
> > > > > > JobGraph is quite mechanically and hard to be optimized. So when
> > > > > > users program with DataStream, they have to learn more internals
> > > > > > and spend a lot of time to tune for performance.
> > > > > > With your proposal, we provide enhanced functionality of Table
> API,
> > > > > > so that users can describe their job easily on Table aspect. This
> > > gives
> > > > > > an opportunity to Flink developers to introduce an optimize phase
> > > > > > while transforming user program(described by Table API) to
> internal
> > > > > > representation.
> > > > > >
> > > > > > Given a user who want to start using Flink with simple ETL,
> > > pipelining
> > > > > > or analytics, he would find it is most naturally described by
> > > SQL/Table
> > > > > > API. Further, as mentioned by @hequn,
> > > > > >
> > > > > > SQL is a widely used language. It follows standards, is a
> > > > > > > descriptive language, and is easy to use
> > > > > >
> > > > > >
> > > > > > thus we could expect with the enhancement of SQL/Table API, Flink
> > > > > > becomes more friendly to users.
> > > > > >
> > > > > > Looking forward to the design doc/FLIP!
> > > > > >
> > > > > > Best,
> > > > > > tison.
> > > > > >
> > > > > >
> > > > > > jincheng sun <[hidden email]> 于2018年11月2日周五 上午11:46写道:
> > > > > >
> > > > > > > Hi Hequn,
> > > > > > > Thanks for your feedback! And also thanks for our offline
> > > discussion!
> > > > > > > You are right, unification of batch and streaming is very
> > important
> > > > for
> > > > > > > flink API.
> > > > > > > We will provide more detailed design later, Please let me know
> if
> > > you
> > > > > > have
> > > > > > > further thoughts or feedback.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Jincheng
> > > > > > >
> > > > > > > Hequn Cheng <[hidden email]> 于2018年11月2日周五 上午10:02写道:
> > > > > > >
> > > > > > > > Hi Jincheng,
> > > > > > > >
> > > > > > > > Thanks a lot for your proposal. It is very encouraging!
> > > > > > > >
> > > > > > > > As we all know, SQL is a widely used language. It follows
> > > > standards,
> > > > > > is a
> > > > > > > > descriptive language, and is easy to use. A powerful feature
> of
> > > SQL
> > > > > is
> > > > > > > that
> > > > > > > > it supports optimization. Users only need to care about the
> > logic
> > > > of
> > > > > > the
> > > > > > > > program. The underlying optimizer will help users optimize
> the
> > > > > > > performance
> > > > > > > > of the program. However, in terms of functionality and ease
> of
> > > use,
> > > > > in
> > > > > > > some
> > > > > > > > scenarios sql will be limited, as described in Jincheng's
> > > proposal.
> > > > > > > >
> > > > > > > > Correspondingly, the DataStream/DataSet api can provide
> > powerful
> > > > > > > > functionalities. Users can write
> > > ProcessFunction/CoProcessFunction
> > > > > and
> > > > > > > get
> > > > > > > > the timer. Compared with SQL, it provides more
> functionalities
> > > and
> > > > > > > > flexibilities. However, it does not support optimization like
> > > SQL.
> > > > > > > > Meanwhile, DataStream/DataSet api has not been unified which
> > > means,
> > > > > for
> > > > > > > the
> > > > > > > > same logic, users need to write a job for each stream and
> > batch.
> > > > > > > >
> > > > > > > > With TableApi, I think we can combine the advantages of both.
> > > Users
> > > > > can
> > > > > > > > easily write relational operations and enjoy optimization. At
> > the
> > > > > same
> > > > > > > > time, it supports more functionality and ease of use. Looking
> > > > forward
> > > > > > to
> > > > > > > > the detailed design/FLIP.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Hequn
> > > > > > > >
> > > > > > > > On Fri, Nov 2, 2018 at 9:48 AM Shaoxuan Wang <
> > > [hidden email]>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Aljoscha,
> > > > > > > > > Glad that you like the proposal. We have completed the
> > > prototype
> > > > of
> > > > > > > most
> > > > > > > > > new proposed functionalities. Once collect the feedback
> from
> > > > > > community,
> > > > > > > > we
> > > > > > > > > will come up with a concrete FLIP/design doc.
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Shaoxuan
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Nov 1, 2018 at 8:12 PM Aljoscha Krettek <
> > > > > [hidden email]
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jincheng,
> > > > > > > > > >
> > > > > > > > > > these points sound very good! Are there any concrete
> > > proposals
> > > > > for
> > > > > > > > > > changes? For example a FLIP/design document?
> > > > > > > > > >
> > > > > > > > > > See here for FLIPs:
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Aljoscha
> > > > > > > > > >
> > > > > > > > > > > On 1. Nov 2018, at 12:51, jincheng sun <
> > > > > [hidden email]
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > *--------I am sorry for the formatting of the email
> > > content.
> > > > I
> > > > > > > > reformat
> > > > > > > > > > > the **content** as follows-----------*
> > > > > > > > > > >
> > > > > > > > > > > *Hi ALL,*
> > > > > > > > > > >
> > > > > > > > > > > With the continuous efforts from the community, the
> Flink
> > > > > system
> > > > > > > has
> > > > > > > > > been
> > > > > > > > > > > continuously improved, which has attracted more and
> more
> > > > users.
> > > > > > > Flink
> > > > > > > > > SQL
> > > > > > > > > > > is a canonical, widely used relational query language.
> > > > However,
> > > > > > > there
> > > > > > > > > are
> > > > > > > > > > > still some scenarios where Flink SQL failed to meet
> user
> > > > needs
> > > > > in
> > > > > > > > terms
> > > > > > > > > > of
> > > > > > > > > > > functionality and ease of use, such as:
> > > > > > > > > > >
> > > > > > > > > > > *1. In terms of functionality*
> > > > > > > > > > >    Iteration, user-defined window, user-defined join,
> > > > > > user-defined
> > > > > > > > > > > GroupReduce, etc. Users cannot express them with SQL;
> > > > > > > > > > >
> > > > > > > > > > > *2. In terms of ease of use*
> > > > > > > > > > >
> > > > > > > > > > >   - Map - e.g. “dataStream.map(mapFun)”. Although
> > > > > > > > “table.select(udf1(),
> > > > > > > > > > >   udf2(), udf3()....)” can be used to accomplish the
> same
> > > > > > > function.,
> > > > > > > > > > with a
> > > > > > > > > > >   map() function returning 100 columns, one has to
> define
> > > or
> > > > > call
> > > > > > > 100
> > > > > > > > > > UDFs
> > > > > > > > > > >   when using SQL, which is quite involved.
> > > > > > > > > > >   - FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”.
> > > > Similarly,
> > > > > > it
> > > > > > > > can
> > > > > > > > > be
> > > > > > > > > > >   implemented with “table.join(udtf).select()”.
> However,
> > it
> > > > is
> > > > > > > > obvious
> > > > > > > > > > that
> > > > > > > > > > >   dataStream is easier to use than SQL.
> > > > > > > > > > >
> > > > > > > > > > > Due to the above two reasons, some users have to use
> the
> > > > > > DataStream
> > > > > > > > API
> > > > > > > > > > or
> > > > > > > > > > > the DataSet API. But when they do that, they lose the
> > > > > unification
> > > > > > > of
> > > > > > > > > > batch
> > > > > > > > > > > and streaming. They will also lose the sophisticated
> > > > > > optimizations
> > > > > > > > such
> > > > > > > > > > as
> > > > > > > > > > > codegen, aggregate join transpose and multi-stage agg
> > from
> > > > > Flink
> > > > > > > SQL.
> > > > > > > > > > >
> > > > > > > > > > > We believe that enhancing the functionality and
> > > productivity
> > > > is
> > > > > > > vital
> > > > > > > > > for
> > > > > > > > > > > the successful adoption of Table API. To this end,
> Table
> > > API
> > > > > > still
> > > > > > > > > > > requires more efforts from every contributor in the
> > > > community.
> > > > > We
> > > > > > > see
> > > > > > > > > > great
> > > > > > > > > > > opportunity in improving our user’s experience from
> this
> > > > work.
> > > > > > Any
> > > > > > > > > > feedback
> > > > > > > > > > > is welcome.
> > > > > > > > > > >
> > > > > > > > > > > Regards,
> > > > > > > > > > >
> > > > > > > > > > > Jincheng
> > > > > > > > > > >
> > > > > > > > > > > jincheng sun <[hidden email]> 于2018年11月1日周四
> > > > > 下午5:07写道:
> > > > > > > > > > >
> > > > > > > > > > >> Hi all,
> > > > > > > > > > >>
> > > > > > > > > > >> With the continuous efforts from the community, the
> > Flink
> > > > > system
> > > > > > > has
> > > > > > > > > > been
> > > > > > > > > > >> continuously improved, which has attracted more and
> more
> > > > > users.
> > > > > > > > Flink
> > > > > > > > > > SQL
> > > > > > > > > > >> is a canonical, widely used relational query language.
> > > > > However,
> > > > > > > > there
> > > > > > > > > > are
> > > > > > > > > > >> still some scenarios where Flink SQL failed to meet
> user
> > > > needs
> > > > > > in
> > > > > > > > > terms
> > > > > > > > > > of
> > > > > > > > > > >> functionality and ease of use, such as:
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >>   -
> > > > > > > > > > >>
> > > > > > > > > > >>   In terms of functionality
> > > > > > > > > > >>
> > > > > > > > > > >> Iteration, user-defined window, user-defined join,
> > > > > user-defined
> > > > > > > > > > >> GroupReduce, etc. Users cannot express them with SQL;
> > > > > > > > > > >>
> > > > > > > > > > >>   -
> > > > > > > > > > >>
> > > > > > > > > > >>   In terms of ease of use
> > > > > > > > > > >>   -
> > > > > > > > > > >>
> > > > > > > > > > >>      Map - e.g. “dataStream.map(mapFun)”. Although
> > > > > > > > > “table.select(udf1(),
> > > > > > > > > > >>      udf2(), udf3()....)” can be used to accomplish
> the
> > > same
> > > > > > > > > function.,
> > > > > > > > > > with a
> > > > > > > > > > >>      map() function returning 100 columns, one has to
> > > define
> > > > > or
> > > > > > > call
> > > > > > > > > > 100 UDFs
> > > > > > > > > > >>      when using SQL, which is quite involved.
> > > > > > > > > > >>      -
> > > > > > > > > > >>
> > > > > > > > > > >>      FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”.
> > > > > Similarly,
> > > > > > > it
> > > > > > > > > can
> > > > > > > > > > >>      be implemented with “table.join(udtf).select()”.
> > > > However,
> > > > > > it
> > > > > > > is
> > > > > > > > > > obvious
> > > > > > > > > > >>      that datastream is easier to use than SQL.
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> Due to the above two reasons, some users have to use
> the
> > > > > > > DataStream
> > > > > > > > > API
> > > > > > > > > > or
> > > > > > > > > > >> the DataSet API. But when they do that, they lose the
> > > > > > unification
> > > > > > > of
> > > > > > > > > > batch
> > > > > > > > > > >> and streaming. They will also lose the sophisticated
> > > > > > optimizations
> > > > > > > > > such
> > > > > > > > > > as
> > > > > > > > > > >> codegen, aggregate join transpose  and multi-stage agg
> > > from
> > > > > > Flink
> > > > > > > > SQL.
> > > > > > > > > > >>
> > > > > > > > > > >> We believe that enhancing the functionality and
> > > productivity
> > > > > is
> > > > > > > > vital
> > > > > > > > > > for
> > > > > > > > > > >> the successful adoption of Table API. To this end,
> > Table
> > > > API
> > > > > > > still
> > > > > > > > > > >> requires more efforts from every contributor in the
> > > > community.
> > > > > > We
> > > > > > > > see
> > > > > > > > > > great
> > > > > > > > > > >> opportunity in improving our user’s experience from
> this
> > > > work.
> > > > > > Any
> > > > > > > > > > feedback
> > > > > > > > > > >> is welcome.
> > > > > > > > > > >>
> > > > > > > > > > >> Regards,
> > > > > > > > > > >>
> > > > > > > > > > >> Jincheng
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
12