Support local aggregate push down for Blink batch planner

classic Classic list List threaded Threaded
27 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Support local aggregate push down for Blink batch planner

Sebastian Liu
Hi all,

I'd like to discuss a new feature for the Blink Planner.
Aggregate operator of Flink SQL is currently fully done at Flink layer.
With the developing of storage, many downstream storage of Flink SQL has
the ability to deal with Aggregation operator.
Pushing down Aggregate to data source layer will improve performance from
the perspective of the network IO and computation overhead.

I have drafted a design doc for this new feature.
https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing

Any comment or discussion is welcome.

--

*With kind regards
------------------------------------------------------------
Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: [hidden email] <[hidden email]>
QQ: 3239559*
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jark Wu-2
Hi Sebastian,

Thanks for the proposal. I think this is a great improvement for Flink SQL.
I went through the design doc and have following thoughts:

1) Flink has deprecated the legacy TableSource in 1.11 and proposed a new
 set of DynamicTableSource interfaces. Could you update your proposal to
use the new interfaces?
 Follow the existing ability interfaces, e.g.
SupportsFilterPushDown, SupportsProjectionPushDown.

2) Personally, I think CallExpression would be a better representation than
separate `FunctionDefinition` and args. Because, it would be easier to know
what's the index and type of the arguments.

3) It would be better to list which connectors will be supported in the
plan?

Best,
Jark


On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <[hidden email]> wrote:

> Hi all,
>
> I'd like to discuss a new feature for the Blink Planner.
> Aggregate operator of Flink SQL is currently fully done at Flink layer.
> With the developing of storage, many downstream storage of Flink SQL has
> the ability to deal with Aggregation operator.
> Pushing down Aggregate to data source layer will improve performance from
> the perspective of the network IO and computation overhead.
>
> I have drafted a design doc for this new feature.
>
> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>
> Any comment or discussion is welcome.
>
> --
>
> *With kind regards
> ------------------------------------------------------------
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: [hidden email] <[hidden email]>
> QQ: 3239559*
>
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Sebastian Liu
Hi Jark, Thx a lot for your quick reply and valuable suggestions.
For (1): Agree: Since we are in the period of upgrading the new table
source api,
we really should consider the new interface for the new optimize rule. If
the new rule
doesn't use the new api, we'll have to upgrade it sooner or later. I have
change to use
the ability interface for the SupportsAggregatePushDown definition in above
proposal.

For (2): Agree: Change to use CallExpression is a better choice, and have
resolved this
comment in the proposal.

For (3): I suggest we first support the JDBC connector, as we don't have
Druid connector
and ES connector just has sink api at present.

But perhaps the biggest question may be whether we should use idea 1 or
idea 2 in proposal.

What do you think?  After we reach the agreement on the proposal, our team
can drive to
complete this feature.

Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:

> Hi Sebastian,
>
> Thanks for the proposal. I think this is a great improvement for Flink SQL.
> I went through the design doc and have following thoughts:
>
> 1) Flink has deprecated the legacy TableSource in 1.11 and proposed a new
>  set of DynamicTableSource interfaces. Could you update your proposal to
> use the new interfaces?
>  Follow the existing ability interfaces, e.g.
> SupportsFilterPushDown, SupportsProjectionPushDown.
>
> 2) Personally, I think CallExpression would be a better representation than
> separate `FunctionDefinition` and args. Because, it would be easier to know
> what's the index and type of the arguments.
>
> 3) It would be better to list which connectors will be supported in the
> plan?
>
> Best,
> Jark
>
>
> On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <[hidden email]> wrote:
>
> > Hi all,
> >
> > I'd like to discuss a new feature for the Blink Planner.
> > Aggregate operator of Flink SQL is currently fully done at Flink layer.
> > With the developing of storage, many downstream storage of Flink SQL has
> > the ability to deal with Aggregation operator.
> > Pushing down Aggregate to data source layer will improve performance from
> > the perspective of the network IO and computation overhead.
> >
> > I have drafted a design doc for this new feature.
> >
> >
> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
> >
> > Any comment or discussion is welcome.
> >
> > --
> >
> > *With kind regards
> > ------------------------------------------------------------
> > Sebastian Liu 刘洋
> > Institute of Computing Technology, Chinese Academy of Science
> > Mobile\WeChat: +86—15201613655
> > E-mail: [hidden email] <[hidden email]>
> > QQ: 3239559*
> >
>


--

*With kind regards
------------------------------------------------------------
Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: [hidden email] <[hidden email]>
QQ: 3239559*
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Kurt Young
Local aggregation is more like a physical operator rather than logical
operator. I would suggest going with idea #1.

Best,
Kurt


On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <[hidden email]> wrote:

> Hi Jark, Thx a lot for your quick reply and valuable suggestions.
> For (1): Agree: Since we are in the period of upgrading the new table
> source api,
> we really should consider the new interface for the new optimize rule. If
> the new rule
> doesn't use the new api, we'll have to upgrade it sooner or later. I have
> change to use
> the ability interface for the SupportsAggregatePushDown definition in above
> proposal.
>
> For (2): Agree: Change to use CallExpression is a better choice, and have
> resolved this
> comment in the proposal.
>
> For (3): I suggest we first support the JDBC connector, as we don't have
> Druid connector
> and ES connector just has sink api at present.
>
> But perhaps the biggest question may be whether we should use idea 1 or
> idea 2 in proposal.
>
> What do you think?  After we reach the agreement on the proposal, our team
> can drive to
> complete this feature.
>
> Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>
> > Hi Sebastian,
> >
> > Thanks for the proposal. I think this is a great improvement for Flink
> SQL.
> > I went through the design doc and have following thoughts:
> >
> > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed a new
> >  set of DynamicTableSource interfaces. Could you update your proposal to
> > use the new interfaces?
> >  Follow the existing ability interfaces, e.g.
> > SupportsFilterPushDown, SupportsProjectionPushDown.
> >
> > 2) Personally, I think CallExpression would be a better representation
> than
> > separate `FunctionDefinition` and args. Because, it would be easier to
> know
> > what's the index and type of the arguments.
> >
> > 3) It would be better to list which connectors will be supported in the
> > plan?
> >
> > Best,
> > Jark
> >
> >
> > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <[hidden email]>
> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to discuss a new feature for the Blink Planner.
> > > Aggregate operator of Flink SQL is currently fully done at Flink layer.
> > > With the developing of storage, many downstream storage of Flink SQL
> has
> > > the ability to deal with Aggregation operator.
> > > Pushing down Aggregate to data source layer will improve performance
> from
> > > the perspective of the network IO and computation overhead.
> > >
> > > I have drafted a design doc for this new feature.
> > >
> > >
> >
> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
> > >
> > > Any comment or discussion is welcome.
> > >
> > > --
> > >
> > > *With kind regards
> > > ------------------------------------------------------------
> > > Sebastian Liu 刘洋
> > > Institute of Computing Technology, Chinese Academy of Science
> > > Mobile\WeChat: +86—15201613655
> > > E-mail: [hidden email] <[hidden email]>
> > > QQ: 3239559*
> > >
> >
>
>
> --
>
> *With kind regards
> ------------------------------------------------------------
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: [hidden email] <[hidden email]>
> QQ: 3239559*
>
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Sebastian Liu
Hi Kurt,

Thx a lot for your feedback. If local aggregation is more like a physical
operator rather than logical
operator, I think your suggestion should be idea #2 which handle all in the
physical optimization phase?

Looking forward for the further discussion.


Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:

> Local aggregation is more like a physical operator rather than logical
> operator. I would suggest going with idea #1.
>
> Best,
> Kurt
>
>
> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <[hidden email]>
> wrote:
>
> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
> > For (1): Agree: Since we are in the period of upgrading the new table
> > source api,
> > we really should consider the new interface for the new optimize rule. If
> > the new rule
> > doesn't use the new api, we'll have to upgrade it sooner or later. I have
> > change to use
> > the ability interface for the SupportsAggregatePushDown definition in
> above
> > proposal.
> >
> > For (2): Agree: Change to use CallExpression is a better choice, and have
> > resolved this
> > comment in the proposal.
> >
> > For (3): I suggest we first support the JDBC connector, as we don't have
> > Druid connector
> > and ES connector just has sink api at present.
> >
> > But perhaps the biggest question may be whether we should use idea 1 or
> > idea 2 in proposal.
> >
> > What do you think?  After we reach the agreement on the proposal, our
> team
> > can drive to
> > complete this feature.
> >
> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
> >
> > > Hi Sebastian,
> > >
> > > Thanks for the proposal. I think this is a great improvement for Flink
> > SQL.
> > > I went through the design doc and have following thoughts:
> > >
> > > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed a
> new
> > >  set of DynamicTableSource interfaces. Could you update your proposal
> to
> > > use the new interfaces?
> > >  Follow the existing ability interfaces, e.g.
> > > SupportsFilterPushDown, SupportsProjectionPushDown.
> > >
> > > 2) Personally, I think CallExpression would be a better representation
> > than
> > > separate `FunctionDefinition` and args. Because, it would be easier to
> > know
> > > what's the index and type of the arguments.
> > >
> > > 3) It would be better to list which connectors will be supported in the
> > > plan?
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <[hidden email]>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to discuss a new feature for the Blink Planner.
> > > > Aggregate operator of Flink SQL is currently fully done at Flink
> layer.
> > > > With the developing of storage, many downstream storage of Flink SQL
> > has
> > > > the ability to deal with Aggregation operator.
> > > > Pushing down Aggregate to data source layer will improve performance
> > from
> > > > the perspective of the network IO and computation overhead.
> > > >
> > > > I have drafted a design doc for this new feature.
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
> > > >
> > > > Any comment or discussion is welcome.
> > > >
> > > > --
> > > >
> > > > *With kind regards
> > > > ------------------------------------------------------------
> > > > Sebastian Liu 刘洋
> > > > Institute of Computing Technology, Chinese Academy of Science
> > > > Mobile\WeChat: +86—15201613655
> > > > E-mail: [hidden email] <[hidden email]>
> > > > QQ: 3239559*
> > > >
> > >
> >
> >
> > --
> >
> > *With kind regards
> > ------------------------------------------------------------
> > Sebastian Liu 刘洋
> > Institute of Computing Technology, Chinese Academy of Science
> > Mobile\WeChat: +86—15201613655
> > E-mail: [hidden email] <[hidden email]>
> > QQ: 3239559*
> >
>


--

*With kind regards
------------------------------------------------------------
Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: [hidden email] <[hidden email]>
QQ: 3239559*
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Kurt Young
 Sorry for the typo -_-!
I meant idea #2.

Best,
Kurt


On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <[hidden email]> wrote:

> Hi Kurt,
>
> Thx a lot for your feedback. If local aggregation is more like a physical
> operator rather than logical
> operator, I think your suggestion should be idea #2 which handle all in
> the physical optimization phase?
>
> Looking forward for the further discussion.
>
>
> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>
>> Local aggregation is more like a physical operator rather than logical
>> operator. I would suggest going with idea #1.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <[hidden email]>
>> wrote:
>>
>> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
>> > For (1): Agree: Since we are in the period of upgrading the new table
>> > source api,
>> > we really should consider the new interface for the new optimize rule.
>> If
>> > the new rule
>> > doesn't use the new api, we'll have to upgrade it sooner or later. I
>> have
>> > change to use
>> > the ability interface for the SupportsAggregatePushDown definition in
>> above
>> > proposal.
>> >
>> > For (2): Agree: Change to use CallExpression is a better choice, and
>> have
>> > resolved this
>> > comment in the proposal.
>> >
>> > For (3): I suggest we first support the JDBC connector, as we don't have
>> > Druid connector
>> > and ES connector just has sink api at present.
>> >
>> > But perhaps the biggest question may be whether we should use idea 1 or
>> > idea 2 in proposal.
>> >
>> > What do you think?  After we reach the agreement on the proposal, our
>> team
>> > can drive to
>> > complete this feature.
>> >
>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>> >
>> > > Hi Sebastian,
>> > >
>> > > Thanks for the proposal. I think this is a great improvement for Flink
>> > SQL.
>> > > I went through the design doc and have following thoughts:
>> > >
>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed a
>> new
>> > >  set of DynamicTableSource interfaces. Could you update your proposal
>> to
>> > > use the new interfaces?
>> > >  Follow the existing ability interfaces, e.g.
>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>> > >
>> > > 2) Personally, I think CallExpression would be a better representation
>> > than
>> > > separate `FunctionDefinition` and args. Because, it would be easier to
>> > know
>> > > what's the index and type of the arguments.
>> > >
>> > > 3) It would be better to list which connectors will be supported in
>> the
>> > > plan?
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > >
>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <[hidden email]>
>> > wrote:
>> > >
>> > > > Hi all,
>> > > >
>> > > > I'd like to discuss a new feature for the Blink Planner.
>> > > > Aggregate operator of Flink SQL is currently fully done at Flink
>> layer.
>> > > > With the developing of storage, many downstream storage of Flink SQL
>> > has
>> > > > the ability to deal with Aggregation operator.
>> > > > Pushing down Aggregate to data source layer will improve performance
>> > from
>> > > > the perspective of the network IO and computation overhead.
>> > > >
>> > > > I have drafted a design doc for this new feature.
>> > > >
>> > > >
>> > >
>> >
>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>> > > >
>> > > > Any comment or discussion is welcome.
>> > > >
>> > > > --
>> > > >
>> > > > *With kind regards
>> > > > ------------------------------------------------------------
>> > > > Sebastian Liu 刘洋
>> > > > Institute of Computing Technology, Chinese Academy of Science
>> > > > Mobile\WeChat: +86—15201613655
>> > > > E-mail: [hidden email] <[hidden email]>
>> > > > QQ: 3239559*
>> > > >
>> > >
>> >
>> >
>> > --
>> >
>> > *With kind regards
>> > ------------------------------------------------------------
>> > Sebastian Liu 刘洋
>> > Institute of Computing Technology, Chinese Academy of Science
>> > Mobile\WeChat: +86—15201613655
>> > E-mail: [hidden email] <[hidden email]>
>> > QQ: 3239559*
>> >
>>
>
>
> --
>
> *With kind regards
> ------------------------------------------------------------
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: [hidden email] <[hidden email]>
> QQ: 3239559*
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jark Wu-2
I'm also +1 for idea#2.

Regarding to the updated interface,

Result applyAggregates(List<CallExpression> aggregateExpressions,
     int[] groupSet, DataType aggOutputDataType);

final class Result {
       private final List<CallExpression> acceptedAggregates;
       private final List<CallExpression> remainingAggregates;
}

I have following comments:

1) Do we need the composite Result return type? Is a boolean return type
enough?
    From my understanding, all of the aggregates should be accepted,
otherwise the pushdown should fail.
    Therefore, users don't need to distinguish which aggregates are
"accepted".

2) Does the `aggOutputDataType` represent the produced data type of the new
source, or just the return type of all the agg functions?
    I would prefer to `producedDataType` just like
`SupportsReadingMetadata` to reduce the effort for users to concat a final
output type.
    The return type of each agg function can be obtained from the
`CallExpression`.

3) What do you think about renaming `groupSet` to `grouping` or
`groupedFields` ?
    The `groupSet` may confuse users that it relates to "grouping sets".


What do you think?

Best,
Jark



On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]> wrote:

> Sorry for the typo -_-!
> I meant idea #2.
>
> Best,
> Kurt
>
>
> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <[hidden email]>
> wrote:
>
>> Hi Kurt,
>>
>> Thx a lot for your feedback. If local aggregation is more like a physical
>> operator rather than logical
>> operator, I think your suggestion should be idea #2 which handle all in
>> the physical optimization phase?
>>
>> Looking forward for the further discussion.
>>
>>
>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>
>>> Local aggregation is more like a physical operator rather than logical
>>> operator. I would suggest going with idea #1.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <[hidden email]>
>>> wrote:
>>>
>>> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
>>> > For (1): Agree: Since we are in the period of upgrading the new table
>>> > source api,
>>> > we really should consider the new interface for the new optimize rule.
>>> If
>>> > the new rule
>>> > doesn't use the new api, we'll have to upgrade it sooner or later. I
>>> have
>>> > change to use
>>> > the ability interface for the SupportsAggregatePushDown definition in
>>> above
>>> > proposal.
>>> >
>>> > For (2): Agree: Change to use CallExpression is a better choice, and
>>> have
>>> > resolved this
>>> > comment in the proposal.
>>> >
>>> > For (3): I suggest we first support the JDBC connector, as we don't
>>> have
>>> > Druid connector
>>> > and ES connector just has sink api at present.
>>> >
>>> > But perhaps the biggest question may be whether we should use idea 1 or
>>> > idea 2 in proposal.
>>> >
>>> > What do you think?  After we reach the agreement on the proposal, our
>>> team
>>> > can drive to
>>> > complete this feature.
>>> >
>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>> >
>>> > > Hi Sebastian,
>>> > >
>>> > > Thanks for the proposal. I think this is a great improvement for
>>> Flink
>>> > SQL.
>>> > > I went through the design doc and have following thoughts:
>>> > >
>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed
>>> a new
>>> > >  set of DynamicTableSource interfaces. Could you update your
>>> proposal to
>>> > > use the new interfaces?
>>> > >  Follow the existing ability interfaces, e.g.
>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>> > >
>>> > > 2) Personally, I think CallExpression would be a better
>>> representation
>>> > than
>>> > > separate `FunctionDefinition` and args. Because, it would be easier
>>> to
>>> > know
>>> > > what's the index and type of the arguments.
>>> > >
>>> > > 3) It would be better to list which connectors will be supported in
>>> the
>>> > > plan?
>>> > >
>>> > > Best,
>>> > > Jark
>>> > >
>>> > >
>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <[hidden email]>
>>> > wrote:
>>> > >
>>> > > > Hi all,
>>> > > >
>>> > > > I'd like to discuss a new feature for the Blink Planner.
>>> > > > Aggregate operator of Flink SQL is currently fully done at Flink
>>> layer.
>>> > > > With the developing of storage, many downstream storage of Flink
>>> SQL
>>> > has
>>> > > > the ability to deal with Aggregation operator.
>>> > > > Pushing down Aggregate to data source layer will improve
>>> performance
>>> > from
>>> > > > the perspective of the network IO and computation overhead.
>>> > > >
>>> > > > I have drafted a design doc for this new feature.
>>> > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>> > > >
>>> > > > Any comment or discussion is welcome.
>>> > > >
>>> > > > --
>>> > > >
>>> > > > *With kind regards
>>> > > > ------------------------------------------------------------
>>> > > > Sebastian Liu 刘洋
>>> > > > Institute of Computing Technology, Chinese Academy of Science
>>> > > > Mobile\WeChat: +86—15201613655
>>> > > > E-mail: [hidden email] <[hidden email]>
>>> > > > QQ: 3239559*
>>> > > >
>>> > >
>>> >
>>> >
>>> > --
>>> >
>>> > *With kind regards
>>> > ------------------------------------------------------------
>>> > Sebastian Liu 刘洋
>>> > Institute of Computing Technology, Chinese Academy of Science
>>> > Mobile\WeChat: +86—15201613655
>>> > E-mail: [hidden email] <[hidden email]>
>>> > QQ: 3239559*
>>> >
>>>
>>
>>
>> --
>>
>> *With kind regards
>> ------------------------------------------------------------
>> Sebastian Liu 刘洋
>> Institute of Computing Technology, Chinese Academy of Science
>> Mobile\WeChat: +86—15201613655
>> E-mail: [hidden email] <[hidden email]>
>> QQ: 3239559*
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Sebastian Liu
In reply to this post by Kurt Young
Thanks for the clarification. I have resolved all of the comments and added
a conclusion section.

Looking forward to the further feedback from our community. If we get
consensus on the design doc, I can push the implementation related work.

Kurt Young <[hidden email]> 于2021年1月5日周二 上午11:04写道:

> Sorry for the typo -_-!
> I meant idea #2.
>
> Best,
> Kurt
>
>
> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <[hidden email]>
> wrote:
>
>> Hi Kurt,
>>
>> Thx a lot for your feedback. If local aggregation is more like a physical
>> operator rather than logical
>> operator, I think your suggestion should be idea #2 which handle all in
>> the physical optimization phase?
>>
>> Looking forward for the further discussion.
>>
>>
>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>
>>> Local aggregation is more like a physical operator rather than logical
>>> operator. I would suggest going with idea #1.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <[hidden email]>
>>> wrote:
>>>
>>> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
>>> > For (1): Agree: Since we are in the period of upgrading the new table
>>> > source api,
>>> > we really should consider the new interface for the new optimize rule.
>>> If
>>> > the new rule
>>> > doesn't use the new api, we'll have to upgrade it sooner or later. I
>>> have
>>> > change to use
>>> > the ability interface for the SupportsAggregatePushDown definition in
>>> above
>>> > proposal.
>>> >
>>> > For (2): Agree: Change to use CallExpression is a better choice, and
>>> have
>>> > resolved this
>>> > comment in the proposal.
>>> >
>>> > For (3): I suggest we first support the JDBC connector, as we don't
>>> have
>>> > Druid connector
>>> > and ES connector just has sink api at present.
>>> >
>>> > But perhaps the biggest question may be whether we should use idea 1 or
>>> > idea 2 in proposal.
>>> >
>>> > What do you think?  After we reach the agreement on the proposal, our
>>> team
>>> > can drive to
>>> > complete this feature.
>>> >
>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>> >
>>> > > Hi Sebastian,
>>> > >
>>> > > Thanks for the proposal. I think this is a great improvement for
>>> Flink
>>> > SQL.
>>> > > I went through the design doc and have following thoughts:
>>> > >
>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed
>>> a new
>>> > >  set of DynamicTableSource interfaces. Could you update your
>>> proposal to
>>> > > use the new interfaces?
>>> > >  Follow the existing ability interfaces, e.g.
>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>> > >
>>> > > 2) Personally, I think CallExpression would be a better
>>> representation
>>> > than
>>> > > separate `FunctionDefinition` and args. Because, it would be easier
>>> to
>>> > know
>>> > > what's the index and type of the arguments.
>>> > >
>>> > > 3) It would be better to list which connectors will be supported in
>>> the
>>> > > plan?
>>> > >
>>> > > Best,
>>> > > Jark
>>> > >
>>> > >
>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <[hidden email]>
>>> > wrote:
>>> > >
>>> > > > Hi all,
>>> > > >
>>> > > > I'd like to discuss a new feature for the Blink Planner.
>>> > > > Aggregate operator of Flink SQL is currently fully done at Flink
>>> layer.
>>> > > > With the developing of storage, many downstream storage of Flink
>>> SQL
>>> > has
>>> > > > the ability to deal with Aggregation operator.
>>> > > > Pushing down Aggregate to data source layer will improve
>>> performance
>>> > from
>>> > > > the perspective of the network IO and computation overhead.
>>> > > >
>>> > > > I have drafted a design doc for this new feature.
>>> > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>> > > >
>>> > > > Any comment or discussion is welcome.
>>> > > >
>>> > > > --
>>> > > >
>>> > > > *With kind regards
>>> > > > ------------------------------------------------------------
>>> > > > Sebastian Liu 刘洋
>>> > > > Institute of Computing Technology, Chinese Academy of Science
>>> > > > Mobile\WeChat: +86—15201613655
>>> > > > E-mail: [hidden email] <[hidden email]>
>>> > > > QQ: 3239559*
>>> > > >
>>> > >
>>> >
>>> >
>>> > --
>>> >
>>> > *With kind regards
>>> > ------------------------------------------------------------
>>> > Sebastian Liu 刘洋
>>> > Institute of Computing Technology, Chinese Academy of Science
>>> > Mobile\WeChat: +86—15201613655
>>> > E-mail: [hidden email] <[hidden email]>
>>> > QQ: 3239559*
>>> >
>>>
>>
>>
>> --
>>
>> *With kind regards
>> ------------------------------------------------------------
>> Sebastian Liu 刘洋
>> Institute of Computing Technology, Chinese Academy of Science
>> Mobile\WeChat: +86—15201613655
>> E-mail: [hidden email] <[hidden email]>
>> QQ: 3239559*
>>
>>

--

*With kind regards
------------------------------------------------------------
Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: [hidden email] <[hidden email]>
QQ: 3239559*
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Sebastian Liu
In reply to this post by Jark Wu-2
Hi Jark,

Thx for your further feedback and help. The interface of
SupportsAggregatePushDown may indeed need some adjustments.

For (1) Agree: Yeah, the upstream only need to know if the TableSource can
handle all of the aggregates.
It's better to just return a boolean type to indicate whether all of
aggregates push down was successful or not. [Resolved in proposal]

For (2) Agree: The aggOutputDataType represent the produced data type of
the new table source to make sure that the new table source can
connect with the related exchange node. The format of this
aggOutputDataType is groupedFields's type + agg function's return type.
The reason for adding this parameter in this function is also to facilitate
the user to build the final output type. I have changed this parameter
to be producedDataType. [Resolved in proposal]

For (3) Agree: Indeed, groupSet may mislead users, I have changed to use
groupingFields. [Resolved in proposal]

Thx again for the suggestion, looking for the further discussion.

Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:

> I'm also +1 for idea#2.
>
> Regarding to the updated interface,
>
> Result applyAggregates(List<CallExpression> aggregateExpressions,
>      int[] groupSet, DataType aggOutputDataType);
>
> final class Result {
>        private final List<CallExpression> acceptedAggregates;
>        private final List<CallExpression> remainingAggregates;
> }
>
> I have following comments:
>
> 1) Do we need the composite Result return type? Is a boolean return type
> enough?
>     From my understanding, all of the aggregates should be accepted,
> otherwise the pushdown should fail.
>     Therefore, users don't need to distinguish which aggregates are
> "accepted".
>
> 2) Does the `aggOutputDataType` represent the produced data type of the
> new source, or just the return type of all the agg functions?
>     I would prefer to `producedDataType` just like
> `SupportsReadingMetadata` to reduce the effort for users to concat a final
> output type.
>     The return type of each agg function can be obtained from the
> `CallExpression`.
>
> 3) What do you think about renaming `groupSet` to `grouping` or
> `groupedFields` ?
>     The `groupSet` may confuse users that it relates to "grouping sets".
>
>
> What do you think?
>
> Best,
> Jark
>
>
>
> On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]> wrote:
>
>> Sorry for the typo -_-!
>> I meant idea #2.
>>
>> Best,
>> Kurt
>>
>>
>> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <[hidden email]>
>> wrote:
>>
>>> Hi Kurt,
>>>
>>> Thx a lot for your feedback. If local aggregation is more like a
>>> physical operator rather than logical
>>> operator, I think your suggestion should be idea #2 which handle all in
>>> the physical optimization phase?
>>>
>>> Looking forward for the further discussion.
>>>
>>>
>>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>
>>>> Local aggregation is more like a physical operator rather than logical
>>>> operator. I would suggest going with idea #1.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <[hidden email]>
>>>> wrote:
>>>>
>>>> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
>>>> > For (1): Agree: Since we are in the period of upgrading the new table
>>>> > source api,
>>>> > we really should consider the new interface for the new optimize
>>>> rule. If
>>>> > the new rule
>>>> > doesn't use the new api, we'll have to upgrade it sooner or later. I
>>>> have
>>>> > change to use
>>>> > the ability interface for the SupportsAggregatePushDown definition in
>>>> above
>>>> > proposal.
>>>> >
>>>> > For (2): Agree: Change to use CallExpression is a better choice, and
>>>> have
>>>> > resolved this
>>>> > comment in the proposal.
>>>> >
>>>> > For (3): I suggest we first support the JDBC connector, as we don't
>>>> have
>>>> > Druid connector
>>>> > and ES connector just has sink api at present.
>>>> >
>>>> > But perhaps the biggest question may be whether we should use idea 1
>>>> or
>>>> > idea 2 in proposal.
>>>> >
>>>> > What do you think?  After we reach the agreement on the proposal, our
>>>> team
>>>> > can drive to
>>>> > complete this feature.
>>>> >
>>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>> >
>>>> > > Hi Sebastian,
>>>> > >
>>>> > > Thanks for the proposal. I think this is a great improvement for
>>>> Flink
>>>> > SQL.
>>>> > > I went through the design doc and have following thoughts:
>>>> > >
>>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed
>>>> a new
>>>> > >  set of DynamicTableSource interfaces. Could you update your
>>>> proposal to
>>>> > > use the new interfaces?
>>>> > >  Follow the existing ability interfaces, e.g.
>>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>> > >
>>>> > > 2) Personally, I think CallExpression would be a better
>>>> representation
>>>> > than
>>>> > > separate `FunctionDefinition` and args. Because, it would be easier
>>>> to
>>>> > know
>>>> > > what's the index and type of the arguments.
>>>> > >
>>>> > > 3) It would be better to list which connectors will be supported in
>>>> the
>>>> > > plan?
>>>> > >
>>>> > > Best,
>>>> > > Jark
>>>> > >
>>>> > >
>>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <[hidden email]>
>>>> > wrote:
>>>> > >
>>>> > > > Hi all,
>>>> > > >
>>>> > > > I'd like to discuss a new feature for the Blink Planner.
>>>> > > > Aggregate operator of Flink SQL is currently fully done at Flink
>>>> layer.
>>>> > > > With the developing of storage, many downstream storage of Flink
>>>> SQL
>>>> > has
>>>> > > > the ability to deal with Aggregation operator.
>>>> > > > Pushing down Aggregate to data source layer will improve
>>>> performance
>>>> > from
>>>> > > > the perspective of the network IO and computation overhead.
>>>> > > >
>>>> > > > I have drafted a design doc for this new feature.
>>>> > > >
>>>> > > >
>>>> > >
>>>> >
>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>> > > >
>>>> > > > Any comment or discussion is welcome.
>>>> > > >
>>>> > > > --
>>>> > > >
>>>> > > > *With kind regards
>>>> > > > ------------------------------------------------------------
>>>> > > > Sebastian Liu 刘洋
>>>> > > > Institute of Computing Technology, Chinese Academy of Science
>>>> > > > Mobile\WeChat: +86—15201613655
>>>> > > > E-mail: [hidden email] <[hidden email]>
>>>> > > > QQ: 3239559*
>>>> > > >
>>>> > >
>>>> >
>>>> >
>>>> > --
>>>> >
>>>> > *With kind regards
>>>> > ------------------------------------------------------------
>>>> > Sebastian Liu 刘洋
>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>> > Mobile\WeChat: +86—15201613655
>>>> > E-mail: [hidden email] <[hidden email]>
>>>> > QQ: 3239559*
>>>> >
>>>>
>>>
>>>
>>> --
>>>
>>> *With kind regards
>>> ------------------------------------------------------------
>>> Sebastian Liu 刘洋
>>> Institute of Computing Technology, Chinese Academy of Science
>>> Mobile\WeChat: +86—15201613655
>>> E-mail: [hidden email] <[hidden email]>
>>> QQ: 3239559*
>>>
>>>

--

*With kind regards
------------------------------------------------------------
Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: [hidden email] <[hidden email]>
QQ: 3239559*
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jingsong Li
Thanks for your proposal! Sebastian.

+1 for SupportsAggregatePushDown. The above wonderful discussion has solved
many of my concerns.

## Semantic problems

We may need to add some mechanisms or comments, because as far as I know,
the semantics of each database is actually different, which may need to be
reflected in your specific implementation.

For example, the AVG output types of various databases may be different.
For example, MySQL outputs double, this is different from Flink. What
should we do? (Lucky, avg will be splitted into sum and count, But we also
need care about decimal and others)

## The phase of push-down rule

I strongly recommend that you do not put it in the Volcano phase, which may
make the cost calculation very troublesome.
So in PHYSICAL_REWRITE?

## About interface

For scalability, I slightly recommend that we introduce an `Aggregate`
interface, it contains `List<CallExpression> aggregateExpressions, int[]
groupingFields, DataType producedDataType` fields. In this way, we can add
fields easily without breaking compatibility.

I think the current design is very good, just put forward some ideas.

Best,
Jingsong

On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <[hidden email]> wrote:

> Hi Jark,
>
> Thx for your further feedback and help. The interface of
> SupportsAggregatePushDown may indeed need some adjustments.
>
> For (1) Agree: Yeah, the upstream only need to know if the TableSource can
> handle all of the aggregates.
> It's better to just return a boolean type to indicate whether all of
> aggregates push down was successful or not. [Resolved in proposal]
>
> For (2) Agree: The aggOutputDataType represent the produced data type of
> the new table source to make sure that the new table source can
> connect with the related exchange node. The format of this
> aggOutputDataType is groupedFields's type + agg function's return type.
> The reason for adding this parameter in this function is also to facilitate
> the user to build the final output type. I have changed this parameter
> to be producedDataType. [Resolved in proposal]
>
> For (3) Agree: Indeed, groupSet may mislead users, I have changed to use
> groupingFields. [Resolved in proposal]
>
> Thx again for the suggestion, looking for the further discussion.
>
> Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>
> > I'm also +1 for idea#2.
> >
> > Regarding to the updated interface,
> >
> > Result applyAggregates(List<CallExpression> aggregateExpressions,
> >      int[] groupSet, DataType aggOutputDataType);
> >
> > final class Result {
> >        private final List<CallExpression> acceptedAggregates;
> >        private final List<CallExpression> remainingAggregates;
> > }
> >
> > I have following comments:
> >
> > 1) Do we need the composite Result return type? Is a boolean return type
> > enough?
> >     From my understanding, all of the aggregates should be accepted,
> > otherwise the pushdown should fail.
> >     Therefore, users don't need to distinguish which aggregates are
> > "accepted".
> >
> > 2) Does the `aggOutputDataType` represent the produced data type of the
> > new source, or just the return type of all the agg functions?
> >     I would prefer to `producedDataType` just like
> > `SupportsReadingMetadata` to reduce the effort for users to concat a
> final
> > output type.
> >     The return type of each agg function can be obtained from the
> > `CallExpression`.
> >
> > 3) What do you think about renaming `groupSet` to `grouping` or
> > `groupedFields` ?
> >     The `groupSet` may confuse users that it relates to "grouping sets".
> >
> >
> > What do you think?
> >
> > Best,
> > Jark
> >
> >
> >
> > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]> wrote:
> >
> >> Sorry for the typo -_-!
> >> I meant idea #2.
> >>
> >> Best,
> >> Kurt
> >>
> >>
> >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <[hidden email]>
> >> wrote:
> >>
> >>> Hi Kurt,
> >>>
> >>> Thx a lot for your feedback. If local aggregation is more like a
> >>> physical operator rather than logical
> >>> operator, I think your suggestion should be idea #2 which handle all in
> >>> the physical optimization phase?
> >>>
> >>> Looking forward for the further discussion.
> >>>
> >>>
> >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
> >>>
> >>>> Local aggregation is more like a physical operator rather than logical
> >>>> operator. I would suggest going with idea #1.
> >>>>
> >>>> Best,
> >>>> Kurt
> >>>>
> >>>>
> >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <[hidden email]>
> >>>> wrote:
> >>>>
> >>>> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
> >>>> > For (1): Agree: Since we are in the period of upgrading the new
> table
> >>>> > source api,
> >>>> > we really should consider the new interface for the new optimize
> >>>> rule. If
> >>>> > the new rule
> >>>> > doesn't use the new api, we'll have to upgrade it sooner or later. I
> >>>> have
> >>>> > change to use
> >>>> > the ability interface for the SupportsAggregatePushDown definition
> in
> >>>> above
> >>>> > proposal.
> >>>> >
> >>>> > For (2): Agree: Change to use CallExpression is a better choice, and
> >>>> have
> >>>> > resolved this
> >>>> > comment in the proposal.
> >>>> >
> >>>> > For (3): I suggest we first support the JDBC connector, as we don't
> >>>> have
> >>>> > Druid connector
> >>>> > and ES connector just has sink api at present.
> >>>> >
> >>>> > But perhaps the biggest question may be whether we should use idea 1
> >>>> or
> >>>> > idea 2 in proposal.
> >>>> >
> >>>> > What do you think?  After we reach the agreement on the proposal,
> our
> >>>> team
> >>>> > can drive to
> >>>> > complete this feature.
> >>>> >
> >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
> >>>> >
> >>>> > > Hi Sebastian,
> >>>> > >
> >>>> > > Thanks for the proposal. I think this is a great improvement for
> >>>> Flink
> >>>> > SQL.
> >>>> > > I went through the design doc and have following thoughts:
> >>>> > >
> >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and
> proposed
> >>>> a new
> >>>> > >  set of DynamicTableSource interfaces. Could you update your
> >>>> proposal to
> >>>> > > use the new interfaces?
> >>>> > >  Follow the existing ability interfaces, e.g.
> >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
> >>>> > >
> >>>> > > 2) Personally, I think CallExpression would be a better
> >>>> representation
> >>>> > than
> >>>> > > separate `FunctionDefinition` and args. Because, it would be
> easier
> >>>> to
> >>>> > know
> >>>> > > what's the index and type of the arguments.
> >>>> > >
> >>>> > > 3) It would be better to list which connectors will be supported
> in
> >>>> the
> >>>> > > plan?
> >>>> > >
> >>>> > > Best,
> >>>> > > Jark
> >>>> > >
> >>>> > >
> >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
> [hidden email]>
> >>>> > wrote:
> >>>> > >
> >>>> > > > Hi all,
> >>>> > > >
> >>>> > > > I'd like to discuss a new feature for the Blink Planner.
> >>>> > > > Aggregate operator of Flink SQL is currently fully done at Flink
> >>>> layer.
> >>>> > > > With the developing of storage, many downstream storage of Flink
> >>>> SQL
> >>>> > has
> >>>> > > > the ability to deal with Aggregation operator.
> >>>> > > > Pushing down Aggregate to data source layer will improve
> >>>> performance
> >>>> > from
> >>>> > > > the perspective of the network IO and computation overhead.
> >>>> > > >
> >>>> > > > I have drafted a design doc for this new feature.
> >>>> > > >
> >>>> > > >
> >>>> > >
> >>>> >
> >>>>
> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
> >>>> > > >
> >>>> > > > Any comment or discussion is welcome.
> >>>> > > >
> >>>> > > > --
> >>>> > > >
> >>>> > > > *With kind regards
> >>>> > > > ------------------------------------------------------------
> >>>> > > > Sebastian Liu 刘洋
> >>>> > > > Institute of Computing Technology, Chinese Academy of Science
> >>>> > > > Mobile\WeChat: +86—15201613655
> >>>> > > > E-mail: [hidden email] <[hidden email]>
> >>>> > > > QQ: 3239559*
> >>>> > > >
> >>>> > >
> >>>> >
> >>>> >
> >>>> > --
> >>>> >
> >>>> > *With kind regards
> >>>> > ------------------------------------------------------------
> >>>> > Sebastian Liu 刘洋
> >>>> > Institute of Computing Technology, Chinese Academy of Science
> >>>> > Mobile\WeChat: +86—15201613655
> >>>> > E-mail: [hidden email] <[hidden email]>
> >>>> > QQ: 3239559*
> >>>> >
> >>>>
> >>>
> >>>
> >>> --
> >>>
> >>> *With kind regards
> >>> ------------------------------------------------------------
> >>> Sebastian Liu 刘洋
> >>> Institute of Computing Technology, Chinese Academy of Science
> >>> Mobile\WeChat: +86—15201613655
> >>> E-mail: [hidden email] <[hidden email]>
> >>> QQ: 3239559*
> >>>
> >>>
>
> --
>
> *With kind regards
> ------------------------------------------------------------
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: [hidden email] <[hidden email]>
> QQ: 3239559*
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jark Wu-2
Thanks for the update.

The proposal looks good to me now.

Best,
Jark

On Tue, 5 Jan 2021 at 14:44, Jingsong Li <[hidden email]> wrote:

> Thanks for your proposal! Sebastian.
>
> +1 for SupportsAggregatePushDown. The above wonderful discussion has
> solved many of my concerns.
>
> ## Semantic problems
>
> We may need to add some mechanisms or comments, because as far as I know,
> the semantics of each database is actually different, which may need to be
> reflected in your specific implementation.
>
> For example, the AVG output types of various databases may be different.
> For example, MySQL outputs double, this is different from Flink. What
> should we do? (Lucky, avg will be splitted into sum and count, But we also
> need care about decimal and others)
>
> ## The phase of push-down rule
>
> I strongly recommend that you do not put it in the Volcano phase, which
> may make the cost calculation very troublesome.
> So in PHYSICAL_REWRITE?
>
> ## About interface
>
> For scalability, I slightly recommend that we introduce an `Aggregate`
> interface, it contains `List<CallExpression> aggregateExpressions, int[]
> groupingFields, DataType producedDataType` fields. In this way, we can add
> fields easily without breaking compatibility.
>
> I think the current design is very good, just put forward some ideas.
>
> Best,
> Jingsong
>
> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <[hidden email]>
> wrote:
>
>> Hi Jark,
>>
>> Thx for your further feedback and help. The interface of
>> SupportsAggregatePushDown may indeed need some adjustments.
>>
>> For (1) Agree: Yeah, the upstream only need to know if the TableSource can
>> handle all of the aggregates.
>> It's better to just return a boolean type to indicate whether all of
>> aggregates push down was successful or not. [Resolved in proposal]
>>
>> For (2) Agree: The aggOutputDataType represent the produced data type of
>> the new table source to make sure that the new table source can
>> connect with the related exchange node. The format of this
>> aggOutputDataType is groupedFields's type + agg function's return type.
>> The reason for adding this parameter in this function is also to
>> facilitate
>> the user to build the final output type. I have changed this parameter
>> to be producedDataType. [Resolved in proposal]
>>
>> For (3) Agree: Indeed, groupSet may mislead users, I have changed to use
>> groupingFields. [Resolved in proposal]
>>
>> Thx again for the suggestion, looking for the further discussion.
>>
>> Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>
>> > I'm also +1 for idea#2.
>> >
>> > Regarding to the updated interface,
>> >
>> > Result applyAggregates(List<CallExpression> aggregateExpressions,
>> >      int[] groupSet, DataType aggOutputDataType);
>> >
>> > final class Result {
>> >        private final List<CallExpression> acceptedAggregates;
>> >        private final List<CallExpression> remainingAggregates;
>> > }
>> >
>> > I have following comments:
>> >
>> > 1) Do we need the composite Result return type? Is a boolean return type
>> > enough?
>> >     From my understanding, all of the aggregates should be accepted,
>> > otherwise the pushdown should fail.
>> >     Therefore, users don't need to distinguish which aggregates are
>> > "accepted".
>> >
>> > 2) Does the `aggOutputDataType` represent the produced data type of the
>> > new source, or just the return type of all the agg functions?
>> >     I would prefer to `producedDataType` just like
>> > `SupportsReadingMetadata` to reduce the effort for users to concat a
>> final
>> > output type.
>> >     The return type of each agg function can be obtained from the
>> > `CallExpression`.
>> >
>> > 3) What do you think about renaming `groupSet` to `grouping` or
>> > `groupedFields` ?
>> >     The `groupSet` may confuse users that it relates to "grouping sets".
>> >
>> >
>> > What do you think?
>> >
>> > Best,
>> > Jark
>> >
>> >
>> >
>> > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]> wrote:
>> >
>> >> Sorry for the typo -_-!
>> >> I meant idea #2.
>> >>
>> >> Best,
>> >> Kurt
>> >>
>> >>
>> >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <[hidden email]>
>> >> wrote:
>> >>
>> >>> Hi Kurt,
>> >>>
>> >>> Thx a lot for your feedback. If local aggregation is more like a
>> >>> physical operator rather than logical
>> >>> operator, I think your suggestion should be idea #2 which handle all
>> in
>> >>> the physical optimization phase?
>> >>>
>> >>> Looking forward for the further discussion.
>> >>>
>> >>>
>> >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>> >>>
>> >>>> Local aggregation is more like a physical operator rather than
>> logical
>> >>>> operator. I would suggest going with idea #1.
>> >>>>
>> >>>> Best,
>> >>>> Kurt
>> >>>>
>> >>>>
>> >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <[hidden email]
>> >
>> >>>> wrote:
>> >>>>
>> >>>> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
>> >>>> > For (1): Agree: Since we are in the period of upgrading the new
>> table
>> >>>> > source api,
>> >>>> > we really should consider the new interface for the new optimize
>> >>>> rule. If
>> >>>> > the new rule
>> >>>> > doesn't use the new api, we'll have to upgrade it sooner or later.
>> I
>> >>>> have
>> >>>> > change to use
>> >>>> > the ability interface for the SupportsAggregatePushDown definition
>> in
>> >>>> above
>> >>>> > proposal.
>> >>>> >
>> >>>> > For (2): Agree: Change to use CallExpression is a better choice,
>> and
>> >>>> have
>> >>>> > resolved this
>> >>>> > comment in the proposal.
>> >>>> >
>> >>>> > For (3): I suggest we first support the JDBC connector, as we don't
>> >>>> have
>> >>>> > Druid connector
>> >>>> > and ES connector just has sink api at present.
>> >>>> >
>> >>>> > But perhaps the biggest question may be whether we should use idea
>> 1
>> >>>> or
>> >>>> > idea 2 in proposal.
>> >>>> >
>> >>>> > What do you think?  After we reach the agreement on the proposal,
>> our
>> >>>> team
>> >>>> > can drive to
>> >>>> > complete this feature.
>> >>>> >
>> >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>> >>>> >
>> >>>> > > Hi Sebastian,
>> >>>> > >
>> >>>> > > Thanks for the proposal. I think this is a great improvement for
>> >>>> Flink
>> >>>> > SQL.
>> >>>> > > I went through the design doc and have following thoughts:
>> >>>> > >
>> >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and
>> proposed
>> >>>> a new
>> >>>> > >  set of DynamicTableSource interfaces. Could you update your
>> >>>> proposal to
>> >>>> > > use the new interfaces?
>> >>>> > >  Follow the existing ability interfaces, e.g.
>> >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>> >>>> > >
>> >>>> > > 2) Personally, I think CallExpression would be a better
>> >>>> representation
>> >>>> > than
>> >>>> > > separate `FunctionDefinition` and args. Because, it would be
>> easier
>> >>>> to
>> >>>> > know
>> >>>> > > what's the index and type of the arguments.
>> >>>> > >
>> >>>> > > 3) It would be better to list which connectors will be supported
>> in
>> >>>> the
>> >>>> > > plan?
>> >>>> > >
>> >>>> > > Best,
>> >>>> > > Jark
>> >>>> > >
>> >>>> > >
>> >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>> [hidden email]>
>> >>>> > wrote:
>> >>>> > >
>> >>>> > > > Hi all,
>> >>>> > > >
>> >>>> > > > I'd like to discuss a new feature for the Blink Planner.
>> >>>> > > > Aggregate operator of Flink SQL is currently fully done at
>> Flink
>> >>>> layer.
>> >>>> > > > With the developing of storage, many downstream storage of
>> Flink
>> >>>> SQL
>> >>>> > has
>> >>>> > > > the ability to deal with Aggregation operator.
>> >>>> > > > Pushing down Aggregate to data source layer will improve
>> >>>> performance
>> >>>> > from
>> >>>> > > > the perspective of the network IO and computation overhead.
>> >>>> > > >
>> >>>> > > > I have drafted a design doc for this new feature.
>> >>>> > > >
>> >>>> > > >
>> >>>> > >
>> >>>> >
>> >>>>
>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>> >>>> > > >
>> >>>> > > > Any comment or discussion is welcome.
>> >>>> > > >
>> >>>> > > > --
>> >>>> > > >
>> >>>> > > > *With kind regards
>> >>>> > > > ------------------------------------------------------------
>> >>>> > > > Sebastian Liu 刘洋
>> >>>> > > > Institute of Computing Technology, Chinese Academy of Science
>> >>>> > > > Mobile\WeChat: +86—15201613655
>> >>>> > > > E-mail: [hidden email] <[hidden email]>
>> >>>> > > > QQ: 3239559*
>> >>>> > > >
>> >>>> > >
>> >>>> >
>> >>>> >
>> >>>> > --
>> >>>> >
>> >>>> > *With kind regards
>> >>>> > ------------------------------------------------------------
>> >>>> > Sebastian Liu 刘洋
>> >>>> > Institute of Computing Technology, Chinese Academy of Science
>> >>>> > Mobile\WeChat: +86—15201613655
>> >>>> > E-mail: [hidden email] <[hidden email]>
>> >>>> > QQ: 3239559*
>> >>>> >
>> >>>>
>> >>>
>> >>>
>> >>> --
>> >>>
>> >>> *With kind regards
>> >>> ------------------------------------------------------------
>> >>> Sebastian Liu 刘洋
>> >>> Institute of Computing Technology, Chinese Academy of Science
>> >>> Mobile\WeChat: +86—15201613655
>> >>> E-mail: [hidden email] <[hidden email]>
>> >>> QQ: 3239559*
>> >>>
>> >>>
>>
>> --
>>
>> *With kind regards
>> ------------------------------------------------------------
>> Sebastian Liu 刘洋
>> Institute of Computing Technology, Chinese Academy of Science
>> Mobile\WeChat: +86—15201613655
>> E-mail: [hidden email] <[hidden email]>
>> QQ: 3239559*
>>
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Sebastian Liu
In reply to this post by Jingsong Li
Hi Jinsong,

Thx a lot for your suggestion. These points really need to be clear in the
proposal.

For the semantic problem, I think the main point is the different returned
data types
for the target aggregate function and the row format returned by the
underlying storage.
That's why we provide the producedDataType in the SupportsAggregatePushDown
interface.
Need to let developers know that we need to handle the semantic differences
between
the underlying storage system and Flink in related connectors.
[Supplemented in proposal]

For the phase of the new PushLocalAggIntoTableSourceScanRule rule, it's also
a key point.
As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
and better to put it
behind the EnforceLocalXXAggRule. [Supplemented in proposal]

For the scalability of the interface, actually I don't exactly understand
your suggestion. Is it to add
an abstract class, to implement the SupportsAggregatePushDown interface,
and holds the
`List < CallExpression > aggregateExpressions, int[] GroupingFields,
DataType producedDataType`
fields?

Looking forward to your further feedback or guidance.

Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:

> Thanks for your proposal! Sebastian.
>
> +1 for SupportsAggregatePushDown. The above wonderful discussion has solved
> many of my concerns.
>
> ## Semantic problems
>
> We may need to add some mechanisms or comments, because as far as I know,
> the semantics of each database is actually different, which may need to be
> reflected in your specific implementation.
>
> For example, the AVG output types of various databases may be different.
> For example, MySQL outputs double, this is different from Flink. What
> should we do? (Lucky, avg will be splitted into sum and count, But we also
> need care about decimal and others)
>
> ## The phase of push-down rule
>
> I strongly recommend that you do not put it in the Volcano phase, which may
> make the cost calculation very troublesome.
> So in PHYSICAL_REWRITE?
>
> ## About interface
>
> For scalability, I slightly recommend that we introduce an `Aggregate`
> interface, it contains `List<CallExpression> aggregateExpressions, int[]
> groupingFields, DataType producedDataType` fields. In this way, we can add
> fields easily without breaking compatibility.
>
> I think the current design is very good, just put forward some ideas.
>
> Best,
> Jingsong
>
> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <[hidden email]>
> wrote:
>
> > Hi Jark,
> >
> > Thx for your further feedback and help. The interface of
> > SupportsAggregatePushDown may indeed need some adjustments.
> >
> > For (1) Agree: Yeah, the upstream only need to know if the TableSource
> can
> > handle all of the aggregates.
> > It's better to just return a boolean type to indicate whether all of
> > aggregates push down was successful or not. [Resolved in proposal]
> >
> > For (2) Agree: The aggOutputDataType represent the produced data type of
> > the new table source to make sure that the new table source can
> > connect with the related exchange node. The format of this
> > aggOutputDataType is groupedFields's type + agg function's return type.
> > The reason for adding this parameter in this function is also to
> facilitate
> > the user to build the final output type. I have changed this parameter
> > to be producedDataType. [Resolved in proposal]
> >
> > For (3) Agree: Indeed, groupSet may mislead users, I have changed to use
> > groupingFields. [Resolved in proposal]
> >
> > Thx again for the suggestion, looking for the further discussion.
> >
> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
> >
> > > I'm also +1 for idea#2.
> > >
> > > Regarding to the updated interface,
> > >
> > > Result applyAggregates(List<CallExpression> aggregateExpressions,
> > >      int[] groupSet, DataType aggOutputDataType);
> > >
> > > final class Result {
> > >        private final List<CallExpression> acceptedAggregates;
> > >        private final List<CallExpression> remainingAggregates;
> > > }
> > >
> > > I have following comments:
> > >
> > > 1) Do we need the composite Result return type? Is a boolean return
> type
> > > enough?
> > >     From my understanding, all of the aggregates should be accepted,
> > > otherwise the pushdown should fail.
> > >     Therefore, users don't need to distinguish which aggregates are
> > > "accepted".
> > >
> > > 2) Does the `aggOutputDataType` represent the produced data type of the
> > > new source, or just the return type of all the agg functions?
> > >     I would prefer to `producedDataType` just like
> > > `SupportsReadingMetadata` to reduce the effort for users to concat a
> > final
> > > output type.
> > >     The return type of each agg function can be obtained from the
> > > `CallExpression`.
> > >
> > > 3) What do you think about renaming `groupSet` to `grouping` or
> > > `groupedFields` ?
> > >     The `groupSet` may confuse users that it relates to "grouping
> sets".
> > >
> > >
> > > What do you think?
> > >
> > > Best,
> > > Jark
> > >
> > >
> > >
> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]> wrote:
> > >
> > >> Sorry for the typo -_-!
> > >> I meant idea #2.
> > >>
> > >> Best,
> > >> Kurt
> > >>
> > >>
> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <[hidden email]>
> > >> wrote:
> > >>
> > >>> Hi Kurt,
> > >>>
> > >>> Thx a lot for your feedback. If local aggregation is more like a
> > >>> physical operator rather than logical
> > >>> operator, I think your suggestion should be idea #2 which handle all
> in
> > >>> the physical optimization phase?
> > >>>
> > >>> Looking forward for the further discussion.
> > >>>
> > >>>
> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
> > >>>
> > >>>> Local aggregation is more like a physical operator rather than
> logical
> > >>>> operator. I would suggest going with idea #1.
> > >>>>
> > >>>> Best,
> > >>>> Kurt
> > >>>>
> > >>>>
> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
> [hidden email]>
> > >>>> wrote:
> > >>>>
> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
> > >>>> > For (1): Agree: Since we are in the period of upgrading the new
> > table
> > >>>> > source api,
> > >>>> > we really should consider the new interface for the new optimize
> > >>>> rule. If
> > >>>> > the new rule
> > >>>> > doesn't use the new api, we'll have to upgrade it sooner or
> later. I
> > >>>> have
> > >>>> > change to use
> > >>>> > the ability interface for the SupportsAggregatePushDown definition
> > in
> > >>>> above
> > >>>> > proposal.
> > >>>> >
> > >>>> > For (2): Agree: Change to use CallExpression is a better choice,
> and
> > >>>> have
> > >>>> > resolved this
> > >>>> > comment in the proposal.
> > >>>> >
> > >>>> > For (3): I suggest we first support the JDBC connector, as we
> don't
> > >>>> have
> > >>>> > Druid connector
> > >>>> > and ES connector just has sink api at present.
> > >>>> >
> > >>>> > But perhaps the biggest question may be whether we should use
> idea 1
> > >>>> or
> > >>>> > idea 2 in proposal.
> > >>>> >
> > >>>> > What do you think?  After we reach the agreement on the proposal,
> > our
> > >>>> team
> > >>>> > can drive to
> > >>>> > complete this feature.
> > >>>> >
> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
> > >>>> >
> > >>>> > > Hi Sebastian,
> > >>>> > >
> > >>>> > > Thanks for the proposal. I think this is a great improvement for
> > >>>> Flink
> > >>>> > SQL.
> > >>>> > > I went through the design doc and have following thoughts:
> > >>>> > >
> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and
> > proposed
> > >>>> a new
> > >>>> > >  set of DynamicTableSource interfaces. Could you update your
> > >>>> proposal to
> > >>>> > > use the new interfaces?
> > >>>> > >  Follow the existing ability interfaces, e.g.
> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
> > >>>> > >
> > >>>> > > 2) Personally, I think CallExpression would be a better
> > >>>> representation
> > >>>> > than
> > >>>> > > separate `FunctionDefinition` and args. Because, it would be
> > easier
> > >>>> to
> > >>>> > know
> > >>>> > > what's the index and type of the arguments.
> > >>>> > >
> > >>>> > > 3) It would be better to list which connectors will be supported
> > in
> > >>>> the
> > >>>> > > plan?
> > >>>> > >
> > >>>> > > Best,
> > >>>> > > Jark
> > >>>> > >
> > >>>> > >
> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
> > [hidden email]>
> > >>>> > wrote:
> > >>>> > >
> > >>>> > > > Hi all,
> > >>>> > > >
> > >>>> > > > I'd like to discuss a new feature for the Blink Planner.
> > >>>> > > > Aggregate operator of Flink SQL is currently fully done at
> Flink
> > >>>> layer.
> > >>>> > > > With the developing of storage, many downstream storage of
> Flink
> > >>>> SQL
> > >>>> > has
> > >>>> > > > the ability to deal with Aggregation operator.
> > >>>> > > > Pushing down Aggregate to data source layer will improve
> > >>>> performance
> > >>>> > from
> > >>>> > > > the perspective of the network IO and computation overhead.
> > >>>> > > >
> > >>>> > > > I have drafted a design doc for this new feature.
> > >>>> > > >
> > >>>> > > >
> > >>>> > >
> > >>>> >
> > >>>>
> >
> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
> > >>>> > > >
> > >>>> > > > Any comment or discussion is welcome.
> > >>>> > > >
> > >>>> > > > --
> > >>>> > > >
> > >>>> > > > *With kind regards
> > >>>> > > > ------------------------------------------------------------
> > >>>> > > > Sebastian Liu 刘洋
> > >>>> > > > Institute of Computing Technology, Chinese Academy of Science
> > >>>> > > > Mobile\WeChat: +86—15201613655
> > >>>> > > > E-mail: [hidden email] <[hidden email]>
> > >>>> > > > QQ: 3239559*
> > >>>> > > >
> > >>>> > >
> > >>>> >
> > >>>> >
> > >>>> > --
> > >>>> >
> > >>>> > *With kind regards
> > >>>> > ------------------------------------------------------------
> > >>>> > Sebastian Liu 刘洋
> > >>>> > Institute of Computing Technology, Chinese Academy of Science
> > >>>> > Mobile\WeChat: +86—15201613655
> > >>>> > E-mail: [hidden email] <[hidden email]>
> > >>>> > QQ: 3239559*
> > >>>> >
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>>
> > >>> *With kind regards
> > >>> ------------------------------------------------------------
> > >>> Sebastian Liu 刘洋
> > >>> Institute of Computing Technology, Chinese Academy of Science
> > >>> Mobile\WeChat: +86—15201613655
> > >>> E-mail: [hidden email] <[hidden email]>
> > >>> QQ: 3239559*
> > >>>
> > >>>
> >
> > --
> >
> > *With kind regards
> > ------------------------------------------------------------
> > Sebastian Liu 刘洋
> > Institute of Computing Technology, Chinese Academy of Science
> > Mobile\WeChat: +86—15201613655
> > E-mail: [hidden email] <[hidden email]>
> > QQ: 3239559*
> >
>
>
> --
> Best, Jingsong Lee
>


--

*With kind regards
------------------------------------------------------------
Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: [hidden email] <[hidden email]>
QQ: 3239559*
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jingsong Li
Hi Sebastian,

Well, I mean:

`boolean applyAggregates(int[] groupingFields, List<CallExpression>
aggregateExpressions, DataType producedDataType);`
VS
```
boolean applyAggregates(Aggregation agg);

interface Aggregation {
  int[] groupingFields();
  List<CallExpression> aggregateExpressions();
  DataType producedDataType();
}
```

Maybe I've over considered it, but I think Aggregation is a complicated
thing. Maybe we need to extend its parameters in the future, so make the
parameters interface, which is conducive to the future expansion without
destroying the compatibility of user implementation. If it is the way
before, users need to modify the code.

Best,
Jingsong

On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <[hidden email]> wrote:

> Hi Jinsong,
>
> Thx a lot for your suggestion. These points really need to be clear in the
> proposal.
>
> For the semantic problem, I think the main point is the different returned
> data types
> for the target aggregate function and the row format returned by the
> underlying storage.
> That's why we provide the producedDataType in the
> SupportsAggregatePushDown interface.
> Need to let developers know that we need to handle the semantic
> differences between
> the underlying storage system and Flink in related connectors.
> [Supplemented in proposal]
>
> For the phase of the new PushLocalAggIntoTableSourceScanRule rule, it's also
> a key point.
> As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
> and better to put it
> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>
> For the scalability of the interface, actually I don't exactly understand
> your suggestion. Is it to add
> an abstract class, to implement the SupportsAggregatePushDown interface,
> and holds the
> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
> DataType producedDataType`
> fields?
>
> Looking forward to your further feedback or guidance.
>
> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>
>> Thanks for your proposal! Sebastian.
>>
>> +1 for SupportsAggregatePushDown. The above wonderful discussion has
>> solved
>> many of my concerns.
>>
>> ## Semantic problems
>>
>> We may need to add some mechanisms or comments, because as far as I know,
>> the semantics of each database is actually different, which may need to be
>> reflected in your specific implementation.
>>
>> For example, the AVG output types of various databases may be different.
>> For example, MySQL outputs double, this is different from Flink. What
>> should we do? (Lucky, avg will be splitted into sum and count, But we also
>> need care about decimal and others)
>>
>> ## The phase of push-down rule
>>
>> I strongly recommend that you do not put it in the Volcano phase, which
>> may
>> make the cost calculation very troublesome.
>> So in PHYSICAL_REWRITE?
>>
>> ## About interface
>>
>> For scalability, I slightly recommend that we introduce an `Aggregate`
>> interface, it contains `List<CallExpression> aggregateExpressions, int[]
>> groupingFields, DataType producedDataType` fields. In this way, we can add
>> fields easily without breaking compatibility.
>>
>> I think the current design is very good, just put forward some ideas.
>>
>> Best,
>> Jingsong
>>
>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <[hidden email]>
>> wrote:
>>
>> > Hi Jark,
>> >
>> > Thx for your further feedback and help. The interface of
>> > SupportsAggregatePushDown may indeed need some adjustments.
>> >
>> > For (1) Agree: Yeah, the upstream only need to know if the TableSource
>> can
>> > handle all of the aggregates.
>> > It's better to just return a boolean type to indicate whether all of
>> > aggregates push down was successful or not. [Resolved in proposal]
>> >
>> > For (2) Agree: The aggOutputDataType represent the produced data type of
>> > the new table source to make sure that the new table source can
>> > connect with the related exchange node. The format of this
>> > aggOutputDataType is groupedFields's type + agg function's return type.
>> > The reason for adding this parameter in this function is also to
>> facilitate
>> > the user to build the final output type. I have changed this parameter
>> > to be producedDataType. [Resolved in proposal]
>> >
>> > For (3) Agree: Indeed, groupSet may mislead users, I have changed to use
>> > groupingFields. [Resolved in proposal]
>> >
>> > Thx again for the suggestion, looking for the further discussion.
>> >
>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>> >
>> > > I'm also +1 for idea#2.
>> > >
>> > > Regarding to the updated interface,
>> > >
>> > > Result applyAggregates(List<CallExpression> aggregateExpressions,
>> > >      int[] groupSet, DataType aggOutputDataType);
>> > >
>> > > final class Result {
>> > >        private final List<CallExpression> acceptedAggregates;
>> > >        private final List<CallExpression> remainingAggregates;
>> > > }
>> > >
>> > > I have following comments:
>> > >
>> > > 1) Do we need the composite Result return type? Is a boolean return
>> type
>> > > enough?
>> > >     From my understanding, all of the aggregates should be accepted,
>> > > otherwise the pushdown should fail.
>> > >     Therefore, users don't need to distinguish which aggregates are
>> > > "accepted".
>> > >
>> > > 2) Does the `aggOutputDataType` represent the produced data type of
>> the
>> > > new source, or just the return type of all the agg functions?
>> > >     I would prefer to `producedDataType` just like
>> > > `SupportsReadingMetadata` to reduce the effort for users to concat a
>> > final
>> > > output type.
>> > >     The return type of each agg function can be obtained from the
>> > > `CallExpression`.
>> > >
>> > > 3) What do you think about renaming `groupSet` to `grouping` or
>> > > `groupedFields` ?
>> > >     The `groupSet` may confuse users that it relates to "grouping
>> sets".
>> > >
>> > >
>> > > What do you think?
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > >
>> > >
>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]> wrote:
>> > >
>> > >> Sorry for the typo -_-!
>> > >> I meant idea #2.
>> > >>
>> > >> Best,
>> > >> Kurt
>> > >>
>> > >>
>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <[hidden email]
>> >
>> > >> wrote:
>> > >>
>> > >>> Hi Kurt,
>> > >>>
>> > >>> Thx a lot for your feedback. If local aggregation is more like a
>> > >>> physical operator rather than logical
>> > >>> operator, I think your suggestion should be idea #2 which handle
>> all in
>> > >>> the physical optimization phase?
>> > >>>
>> > >>> Looking forward for the further discussion.
>> > >>>
>> > >>>
>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>> > >>>
>> > >>>> Local aggregation is more like a physical operator rather than
>> logical
>> > >>>> operator. I would suggest going with idea #1.
>> > >>>>
>> > >>>> Best,
>> > >>>> Kurt
>> > >>>>
>> > >>>>
>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>> [hidden email]>
>> > >>>> wrote:
>> > >>>>
>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
>> > >>>> > For (1): Agree: Since we are in the period of upgrading the new
>> > table
>> > >>>> > source api,
>> > >>>> > we really should consider the new interface for the new optimize
>> > >>>> rule. If
>> > >>>> > the new rule
>> > >>>> > doesn't use the new api, we'll have to upgrade it sooner or
>> later. I
>> > >>>> have
>> > >>>> > change to use
>> > >>>> > the ability interface for the SupportsAggregatePushDown
>> definition
>> > in
>> > >>>> above
>> > >>>> > proposal.
>> > >>>> >
>> > >>>> > For (2): Agree: Change to use CallExpression is a better choice,
>> and
>> > >>>> have
>> > >>>> > resolved this
>> > >>>> > comment in the proposal.
>> > >>>> >
>> > >>>> > For (3): I suggest we first support the JDBC connector, as we
>> don't
>> > >>>> have
>> > >>>> > Druid connector
>> > >>>> > and ES connector just has sink api at present.
>> > >>>> >
>> > >>>> > But perhaps the biggest question may be whether we should use
>> idea 1
>> > >>>> or
>> > >>>> > idea 2 in proposal.
>> > >>>> >
>> > >>>> > What do you think?  After we reach the agreement on the proposal,
>> > our
>> > >>>> team
>> > >>>> > can drive to
>> > >>>> > complete this feature.
>> > >>>> >
>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>> > >>>> >
>> > >>>> > > Hi Sebastian,
>> > >>>> > >
>> > >>>> > > Thanks for the proposal. I think this is a great improvement
>> for
>> > >>>> Flink
>> > >>>> > SQL.
>> > >>>> > > I went through the design doc and have following thoughts:
>> > >>>> > >
>> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and
>> > proposed
>> > >>>> a new
>> > >>>> > >  set of DynamicTableSource interfaces. Could you update your
>> > >>>> proposal to
>> > >>>> > > use the new interfaces?
>> > >>>> > >  Follow the existing ability interfaces, e.g.
>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>> > >>>> > >
>> > >>>> > > 2) Personally, I think CallExpression would be a better
>> > >>>> representation
>> > >>>> > than
>> > >>>> > > separate `FunctionDefinition` and args. Because, it would be
>> > easier
>> > >>>> to
>> > >>>> > know
>> > >>>> > > what's the index and type of the arguments.
>> > >>>> > >
>> > >>>> > > 3) It would be better to list which connectors will be
>> supported
>> > in
>> > >>>> the
>> > >>>> > > plan?
>> > >>>> > >
>> > >>>> > > Best,
>> > >>>> > > Jark
>> > >>>> > >
>> > >>>> > >
>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>> > [hidden email]>
>> > >>>> > wrote:
>> > >>>> > >
>> > >>>> > > > Hi all,
>> > >>>> > > >
>> > >>>> > > > I'd like to discuss a new feature for the Blink Planner.
>> > >>>> > > > Aggregate operator of Flink SQL is currently fully done at
>> Flink
>> > >>>> layer.
>> > >>>> > > > With the developing of storage, many downstream storage of
>> Flink
>> > >>>> SQL
>> > >>>> > has
>> > >>>> > > > the ability to deal with Aggregation operator.
>> > >>>> > > > Pushing down Aggregate to data source layer will improve
>> > >>>> performance
>> > >>>> > from
>> > >>>> > > > the perspective of the network IO and computation overhead.
>> > >>>> > > >
>> > >>>> > > > I have drafted a design doc for this new feature.
>> > >>>> > > >
>> > >>>> > > >
>> > >>>> > >
>> > >>>> >
>> > >>>>
>> >
>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>> > >>>> > > >
>> > >>>> > > > Any comment or discussion is welcome.
>> > >>>> > > >
>> > >>>> > > > --
>> > >>>> > > >
>> > >>>> > > > *With kind regards
>> > >>>> > > > ------------------------------------------------------------
>> > >>>> > > > Sebastian Liu 刘洋
>> > >>>> > > > Institute of Computing Technology, Chinese Academy of Science
>> > >>>> > > > Mobile\WeChat: +86—15201613655
>> > >>>> > > > E-mail: [hidden email] <[hidden email]>
>> > >>>> > > > QQ: 3239559*
>> > >>>> > > >
>> > >>>> > >
>> > >>>> >
>> > >>>> >
>> > >>>> > --
>> > >>>> >
>> > >>>> > *With kind regards
>> > >>>> > ------------------------------------------------------------
>> > >>>> > Sebastian Liu 刘洋
>> > >>>> > Institute of Computing Technology, Chinese Academy of Science
>> > >>>> > Mobile\WeChat: +86—15201613655
>> > >>>> > E-mail: [hidden email] <[hidden email]>
>> > >>>> > QQ: 3239559*
>> > >>>> >
>> > >>>>
>> > >>>
>> > >>>
>> > >>> --
>> > >>>
>> > >>> *With kind regards
>> > >>> ------------------------------------------------------------
>> > >>> Sebastian Liu 刘洋
>> > >>> Institute of Computing Technology, Chinese Academy of Science
>> > >>> Mobile\WeChat: +86—15201613655
>> > >>> E-mail: [hidden email] <[hidden email]>
>> > >>> QQ: 3239559*
>> > >>>
>> > >>>
>> >
>> > --
>> >
>> > *With kind regards
>> > ------------------------------------------------------------
>> > Sebastian Liu 刘洋
>> > Institute of Computing Technology, Chinese Academy of Science
>> > Mobile\WeChat: +86—15201613655
>> > E-mail: [hidden email] <[hidden email]>
>> > QQ: 3239559*
>> >
>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
>
> *With kind regards
> ------------------------------------------------------------
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: [hidden email] <[hidden email]>
> QQ: 3239559*
>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jark Wu-2
I think this may be over designed. We should have confidence in the
interface we design, the interface should be stable.
Wrapping things in a big context has a cost of losing user convenience.
Foremost, we don't see any parameters to add in the future. Do you know any
potential parameters?

Best,
Jark

On Wed, 6 Jan 2021 at 10:28, Jingsong Li <[hidden email]> wrote:

> Hi Sebastian,
>
> Well, I mean:
>
> `boolean applyAggregates(int[] groupingFields, List<CallExpression>
> aggregateExpressions, DataType producedDataType);`
> VS
> ```
> boolean applyAggregates(Aggregation agg);
>
> interface Aggregation {
>   int[] groupingFields();
>   List<CallExpression> aggregateExpressions();
>   DataType producedDataType();
> }
> ```
>
> Maybe I've over considered it, but I think Aggregation is a complicated
> thing. Maybe we need to extend its parameters in the future, so make the
> parameters interface, which is conducive to the future expansion without
> destroying the compatibility of user implementation. If it is the way
> before, users need to modify the code.
>
> Best,
> Jingsong
>
> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <[hidden email]>
> wrote:
>
>> Hi Jinsong,
>>
>> Thx a lot for your suggestion. These points really need to be clear in
>> the proposal.
>>
>> For the semantic problem, I think the main point is the different
>> returned data types
>> for the target aggregate function and the row format returned by the
>> underlying storage.
>> That's why we provide the producedDataType in the
>> SupportsAggregatePushDown interface.
>> Need to let developers know that we need to handle the semantic
>> differences between
>> the underlying storage system and Flink in related connectors.
>> [Supplemented in proposal]
>>
>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule, it's also
>> a key point.
>> As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
>> and better to put it
>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>
>> For the scalability of the interface, actually I don't exactly understand
>> your suggestion. Is it to add
>> an abstract class, to implement the SupportsAggregatePushDown interface,
>> and holds the
>> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
>> DataType producedDataType`
>> fields?
>>
>> Looking forward to your further feedback or guidance.
>>
>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>
>>> Thanks for your proposal! Sebastian.
>>>
>>> +1 for SupportsAggregatePushDown. The above wonderful discussion has
>>> solved
>>> many of my concerns.
>>>
>>> ## Semantic problems
>>>
>>> We may need to add some mechanisms or comments, because as far as I know,
>>> the semantics of each database is actually different, which may need to
>>> be
>>> reflected in your specific implementation.
>>>
>>> For example, the AVG output types of various databases may be different.
>>> For example, MySQL outputs double, this is different from Flink. What
>>> should we do? (Lucky, avg will be splitted into sum and count, But we
>>> also
>>> need care about decimal and others)
>>>
>>> ## The phase of push-down rule
>>>
>>> I strongly recommend that you do not put it in the Volcano phase, which
>>> may
>>> make the cost calculation very troublesome.
>>> So in PHYSICAL_REWRITE?
>>>
>>> ## About interface
>>>
>>> For scalability, I slightly recommend that we introduce an `Aggregate`
>>> interface, it contains `List<CallExpression> aggregateExpressions, int[]
>>> groupingFields, DataType producedDataType` fields. In this way, we can
>>> add
>>> fields easily without breaking compatibility.
>>>
>>> I think the current design is very good, just put forward some ideas.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <[hidden email]>
>>> wrote:
>>>
>>> > Hi Jark,
>>> >
>>> > Thx for your further feedback and help. The interface of
>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>> >
>>> > For (1) Agree: Yeah, the upstream only need to know if the TableSource
>>> can
>>> > handle all of the aggregates.
>>> > It's better to just return a boolean type to indicate whether all of
>>> > aggregates push down was successful or not. [Resolved in proposal]
>>> >
>>> > For (2) Agree: The aggOutputDataType represent the produced data type
>>> of
>>> > the new table source to make sure that the new table source can
>>> > connect with the related exchange node. The format of this
>>> > aggOutputDataType is groupedFields's type + agg function's return type.
>>> > The reason for adding this parameter in this function is also to
>>> facilitate
>>> > the user to build the final output type. I have changed this parameter
>>> > to be producedDataType. [Resolved in proposal]
>>> >
>>> > For (3) Agree: Indeed, groupSet may mislead users, I have changed to
>>> use
>>> > groupingFields. [Resolved in proposal]
>>> >
>>> > Thx again for the suggestion, looking for the further discussion.
>>> >
>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>> >
>>> > > I'm also +1 for idea#2.
>>> > >
>>> > > Regarding to the updated interface,
>>> > >
>>> > > Result applyAggregates(List<CallExpression> aggregateExpressions,
>>> > >      int[] groupSet, DataType aggOutputDataType);
>>> > >
>>> > > final class Result {
>>> > >        private final List<CallExpression> acceptedAggregates;
>>> > >        private final List<CallExpression> remainingAggregates;
>>> > > }
>>> > >
>>> > > I have following comments:
>>> > >
>>> > > 1) Do we need the composite Result return type? Is a boolean return
>>> type
>>> > > enough?
>>> > >     From my understanding, all of the aggregates should be accepted,
>>> > > otherwise the pushdown should fail.
>>> > >     Therefore, users don't need to distinguish which aggregates are
>>> > > "accepted".
>>> > >
>>> > > 2) Does the `aggOutputDataType` represent the produced data type of
>>> the
>>> > > new source, or just the return type of all the agg functions?
>>> > >     I would prefer to `producedDataType` just like
>>> > > `SupportsReadingMetadata` to reduce the effort for users to concat a
>>> > final
>>> > > output type.
>>> > >     The return type of each agg function can be obtained from the
>>> > > `CallExpression`.
>>> > >
>>> > > 3) What do you think about renaming `groupSet` to `grouping` or
>>> > > `groupedFields` ?
>>> > >     The `groupSet` may confuse users that it relates to "grouping
>>> sets".
>>> > >
>>> > >
>>> > > What do you think?
>>> > >
>>> > > Best,
>>> > > Jark
>>> > >
>>> > >
>>> > >
>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]> wrote:
>>> > >
>>> > >> Sorry for the typo -_-!
>>> > >> I meant idea #2.
>>> > >>
>>> > >> Best,
>>> > >> Kurt
>>> > >>
>>> > >>
>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>> [hidden email]>
>>> > >> wrote:
>>> > >>
>>> > >>> Hi Kurt,
>>> > >>>
>>> > >>> Thx a lot for your feedback. If local aggregation is more like a
>>> > >>> physical operator rather than logical
>>> > >>> operator, I think your suggestion should be idea #2 which handle
>>> all in
>>> > >>> the physical optimization phase?
>>> > >>>
>>> > >>> Looking forward for the further discussion.
>>> > >>>
>>> > >>>
>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>> > >>>
>>> > >>>> Local aggregation is more like a physical operator rather than
>>> logical
>>> > >>>> operator. I would suggest going with idea #1.
>>> > >>>>
>>> > >>>> Best,
>>> > >>>> Kurt
>>> > >>>>
>>> > >>>>
>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>> [hidden email]>
>>> > >>>> wrote:
>>> > >>>>
>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>> suggestions.
>>> > >>>> > For (1): Agree: Since we are in the period of upgrading the new
>>> > table
>>> > >>>> > source api,
>>> > >>>> > we really should consider the new interface for the new optimize
>>> > >>>> rule. If
>>> > >>>> > the new rule
>>> > >>>> > doesn't use the new api, we'll have to upgrade it sooner or
>>> later. I
>>> > >>>> have
>>> > >>>> > change to use
>>> > >>>> > the ability interface for the SupportsAggregatePushDown
>>> definition
>>> > in
>>> > >>>> above
>>> > >>>> > proposal.
>>> > >>>> >
>>> > >>>> > For (2): Agree: Change to use CallExpression is a better
>>> choice, and
>>> > >>>> have
>>> > >>>> > resolved this
>>> > >>>> > comment in the proposal.
>>> > >>>> >
>>> > >>>> > For (3): I suggest we first support the JDBC connector, as we
>>> don't
>>> > >>>> have
>>> > >>>> > Druid connector
>>> > >>>> > and ES connector just has sink api at present.
>>> > >>>> >
>>> > >>>> > But perhaps the biggest question may be whether we should use
>>> idea 1
>>> > >>>> or
>>> > >>>> > idea 2 in proposal.
>>> > >>>> >
>>> > >>>> > What do you think?  After we reach the agreement on the
>>> proposal,
>>> > our
>>> > >>>> team
>>> > >>>> > can drive to
>>> > >>>> > complete this feature.
>>> > >>>> >
>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>> > >>>> >
>>> > >>>> > > Hi Sebastian,
>>> > >>>> > >
>>> > >>>> > > Thanks for the proposal. I think this is a great improvement
>>> for
>>> > >>>> Flink
>>> > >>>> > SQL.
>>> > >>>> > > I went through the design doc and have following thoughts:
>>> > >>>> > >
>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and
>>> > proposed
>>> > >>>> a new
>>> > >>>> > >  set of DynamicTableSource interfaces. Could you update your
>>> > >>>> proposal to
>>> > >>>> > > use the new interfaces?
>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>> > >>>> > >
>>> > >>>> > > 2) Personally, I think CallExpression would be a better
>>> > >>>> representation
>>> > >>>> > than
>>> > >>>> > > separate `FunctionDefinition` and args. Because, it would be
>>> > easier
>>> > >>>> to
>>> > >>>> > know
>>> > >>>> > > what's the index and type of the arguments.
>>> > >>>> > >
>>> > >>>> > > 3) It would be better to list which connectors will be
>>> supported
>>> > in
>>> > >>>> the
>>> > >>>> > > plan?
>>> > >>>> > >
>>> > >>>> > > Best,
>>> > >>>> > > Jark
>>> > >>>> > >
>>> > >>>> > >
>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>> > [hidden email]>
>>> > >>>> > wrote:
>>> > >>>> > >
>>> > >>>> > > > Hi all,
>>> > >>>> > > >
>>> > >>>> > > > I'd like to discuss a new feature for the Blink Planner.
>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully done at
>>> Flink
>>> > >>>> layer.
>>> > >>>> > > > With the developing of storage, many downstream storage of
>>> Flink
>>> > >>>> SQL
>>> > >>>> > has
>>> > >>>> > > > the ability to deal with Aggregation operator.
>>> > >>>> > > > Pushing down Aggregate to data source layer will improve
>>> > >>>> performance
>>> > >>>> > from
>>> > >>>> > > > the perspective of the network IO and computation overhead.
>>> > >>>> > > >
>>> > >>>> > > > I have drafted a design doc for this new feature.
>>> > >>>> > > >
>>> > >>>> > > >
>>> > >>>> > >
>>> > >>>> >
>>> > >>>>
>>> >
>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>> > >>>> > > >
>>> > >>>> > > > Any comment or discussion is welcome.
>>> > >>>> > > >
>>> > >>>> > > > --
>>> > >>>> > > >
>>> > >>>> > > > *With kind regards
>>> > >>>> > > > ------------------------------------------------------------
>>> > >>>> > > > Sebastian Liu 刘洋
>>> > >>>> > > > Institute of Computing Technology, Chinese Academy of
>>> Science
>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>> > >>>> > > > E-mail: [hidden email] <[hidden email]>
>>> > >>>> > > > QQ: 3239559*
>>> > >>>> > > >
>>> > >>>> > >
>>> > >>>> >
>>> > >>>> >
>>> > >>>> > --
>>> > >>>> >
>>> > >>>> > *With kind regards
>>> > >>>> > ------------------------------------------------------------
>>> > >>>> > Sebastian Liu 刘洋
>>> > >>>> > Institute of Computing Technology, Chinese Academy of Science
>>> > >>>> > Mobile\WeChat: +86—15201613655
>>> > >>>> > E-mail: [hidden email] <[hidden email]>
>>> > >>>> > QQ: 3239559*
>>> > >>>> >
>>> > >>>>
>>> > >>>
>>> > >>>
>>> > >>> --
>>> > >>>
>>> > >>> *With kind regards
>>> > >>> ------------------------------------------------------------
>>> > >>> Sebastian Liu 刘洋
>>> > >>> Institute of Computing Technology, Chinese Academy of Science
>>> > >>> Mobile\WeChat: +86—15201613655
>>> > >>> E-mail: [hidden email] <[hidden email]>
>>> > >>> QQ: 3239559*
>>> > >>>
>>> > >>>
>>> >
>>> > --
>>> >
>>> > *With kind regards
>>> > ------------------------------------------------------------
>>> > Sebastian Liu 刘洋
>>> > Institute of Computing Technology, Chinese Academy of Science
>>> > Mobile\WeChat: +86—15201613655
>>> > E-mail: [hidden email] <[hidden email]>
>>> > QQ: 3239559*
>>> >
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> --
>>
>> *With kind regards
>> ------------------------------------------------------------
>> Sebastian Liu 刘洋
>> Institute of Computing Technology, Chinese Academy of Science
>> Mobile\WeChat: +86—15201613655
>> E-mail: [hidden email] <[hidden email]>
>> QQ: 3239559*
>>
>>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jingsong Li
Hi Jark,

I don't want to limit this interface to LocalAgg Push down. Actually,
sometimes, we can push whole aggregation to source too.

So, this rule can do something more advanced. For example, we can push down
group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1, f2)".
Then, we need to add more information to push down.

Best,
Jingsong

On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]> wrote:

> I think this may be over designed. We should have confidence in the
> interface we design, the interface should be stable.
> Wrapping things in a big context has a cost of losing user convenience.
> Foremost, we don't see any parameters to add in the future. Do you know
> any potential parameters?
>
> Best,
> Jark
>
> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <[hidden email]> wrote:
>
>> Hi Sebastian,
>>
>> Well, I mean:
>>
>> `boolean applyAggregates(int[] groupingFields, List<CallExpression>
>> aggregateExpressions, DataType producedDataType);`
>> VS
>> ```
>> boolean applyAggregates(Aggregation agg);
>>
>> interface Aggregation {
>>   int[] groupingFields();
>>   List<CallExpression> aggregateExpressions();
>>   DataType producedDataType();
>> }
>> ```
>>
>> Maybe I've over considered it, but I think Aggregation is a complicated
>> thing. Maybe we need to extend its parameters in the future, so make the
>> parameters interface, which is conducive to the future expansion without
>> destroying the compatibility of user implementation. If it is the way
>> before, users need to modify the code.
>>
>> Best,
>> Jingsong
>>
>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <[hidden email]>
>> wrote:
>>
>>> Hi Jinsong,
>>>
>>> Thx a lot for your suggestion. These points really need to be clear in
>>> the proposal.
>>>
>>> For the semantic problem, I think the main point is the different
>>> returned data types
>>> for the target aggregate function and the row format returned by the
>>> underlying storage.
>>> That's why we provide the producedDataType in the
>>> SupportsAggregatePushDown interface.
>>> Need to let developers know that we need to handle the semantic
>>> differences between
>>> the underlying storage system and Flink in related connectors.
>>> [Supplemented in proposal]
>>>
>>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule, it's also
>>> a key point.
>>> As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
>>> and better to put it
>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>
>>> For the scalability of the interface, actually I don't exactly
>>> understand your suggestion. Is it to add
>>> an abstract class, to implement the SupportsAggregatePushDown interface,
>>> and holds the
>>> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
>>> DataType producedDataType`
>>> fields?
>>>
>>> Looking forward to your further feedback or guidance.
>>>
>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>
>>>> Thanks for your proposal! Sebastian.
>>>>
>>>> +1 for SupportsAggregatePushDown. The above wonderful discussion has
>>>> solved
>>>> many of my concerns.
>>>>
>>>> ## Semantic problems
>>>>
>>>> We may need to add some mechanisms or comments, because as far as I
>>>> know,
>>>> the semantics of each database is actually different, which may need to
>>>> be
>>>> reflected in your specific implementation.
>>>>
>>>> For example, the AVG output types of various databases may be different.
>>>> For example, MySQL outputs double, this is different from Flink. What
>>>> should we do? (Lucky, avg will be splitted into sum and count, But we
>>>> also
>>>> need care about decimal and others)
>>>>
>>>> ## The phase of push-down rule
>>>>
>>>> I strongly recommend that you do not put it in the Volcano phase, which
>>>> may
>>>> make the cost calculation very troublesome.
>>>> So in PHYSICAL_REWRITE?
>>>>
>>>> ## About interface
>>>>
>>>> For scalability, I slightly recommend that we introduce an `Aggregate`
>>>> interface, it contains `List<CallExpression> aggregateExpressions, int[]
>>>> groupingFields, DataType producedDataType` fields. In this way, we can
>>>> add
>>>> fields easily without breaking compatibility.
>>>>
>>>> I think the current design is very good, just put forward some ideas.
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <[hidden email]>
>>>> wrote:
>>>>
>>>> > Hi Jark,
>>>> >
>>>> > Thx for your further feedback and help. The interface of
>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>> >
>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>> TableSource can
>>>> > handle all of the aggregates.
>>>> > It's better to just return a boolean type to indicate whether all of
>>>> > aggregates push down was successful or not. [Resolved in proposal]
>>>> >
>>>> > For (2) Agree: The aggOutputDataType represent the produced data type
>>>> of
>>>> > the new table source to make sure that the new table source can
>>>> > connect with the related exchange node. The format of this
>>>> > aggOutputDataType is groupedFields's type + agg function's return
>>>> type.
>>>> > The reason for adding this parameter in this function is also to
>>>> facilitate
>>>> > the user to build the final output type. I have changed this parameter
>>>> > to be producedDataType. [Resolved in proposal]
>>>> >
>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have changed to
>>>> use
>>>> > groupingFields. [Resolved in proposal]
>>>> >
>>>> > Thx again for the suggestion, looking for the further discussion.
>>>> >
>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>> >
>>>> > > I'm also +1 for idea#2.
>>>> > >
>>>> > > Regarding to the updated interface,
>>>> > >
>>>> > > Result applyAggregates(List<CallExpression> aggregateExpressions,
>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>> > >
>>>> > > final class Result {
>>>> > >        private final List<CallExpression> acceptedAggregates;
>>>> > >        private final List<CallExpression> remainingAggregates;
>>>> > > }
>>>> > >
>>>> > > I have following comments:
>>>> > >
>>>> > > 1) Do we need the composite Result return type? Is a boolean return
>>>> type
>>>> > > enough?
>>>> > >     From my understanding, all of the aggregates should be accepted,
>>>> > > otherwise the pushdown should fail.
>>>> > >     Therefore, users don't need to distinguish which aggregates are
>>>> > > "accepted".
>>>> > >
>>>> > > 2) Does the `aggOutputDataType` represent the produced data type of
>>>> the
>>>> > > new source, or just the return type of all the agg functions?
>>>> > >     I would prefer to `producedDataType` just like
>>>> > > `SupportsReadingMetadata` to reduce the effort for users to concat a
>>>> > final
>>>> > > output type.
>>>> > >     The return type of each agg function can be obtained from the
>>>> > > `CallExpression`.
>>>> > >
>>>> > > 3) What do you think about renaming `groupSet` to `grouping` or
>>>> > > `groupedFields` ?
>>>> > >     The `groupSet` may confuse users that it relates to "grouping
>>>> sets".
>>>> > >
>>>> > >
>>>> > > What do you think?
>>>> > >
>>>> > > Best,
>>>> > > Jark
>>>> > >
>>>> > >
>>>> > >
>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]> wrote:
>>>> > >
>>>> > >> Sorry for the typo -_-!
>>>> > >> I meant idea #2.
>>>> > >>
>>>> > >> Best,
>>>> > >> Kurt
>>>> > >>
>>>> > >>
>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>> [hidden email]>
>>>> > >> wrote:
>>>> > >>
>>>> > >>> Hi Kurt,
>>>> > >>>
>>>> > >>> Thx a lot for your feedback. If local aggregation is more like a
>>>> > >>> physical operator rather than logical
>>>> > >>> operator, I think your suggestion should be idea #2 which handle
>>>> all in
>>>> > >>> the physical optimization phase?
>>>> > >>>
>>>> > >>> Looking forward for the further discussion.
>>>> > >>>
>>>> > >>>
>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>> > >>>
>>>> > >>>> Local aggregation is more like a physical operator rather than
>>>> logical
>>>> > >>>> operator. I would suggest going with idea #1.
>>>> > >>>>
>>>> > >>>> Best,
>>>> > >>>> Kurt
>>>> > >>>>
>>>> > >>>>
>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>> [hidden email]>
>>>> > >>>> wrote:
>>>> > >>>>
>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>> suggestions.
>>>> > >>>> > For (1): Agree: Since we are in the period of upgrading the new
>>>> > table
>>>> > >>>> > source api,
>>>> > >>>> > we really should consider the new interface for the new
>>>> optimize
>>>> > >>>> rule. If
>>>> > >>>> > the new rule
>>>> > >>>> > doesn't use the new api, we'll have to upgrade it sooner or
>>>> later. I
>>>> > >>>> have
>>>> > >>>> > change to use
>>>> > >>>> > the ability interface for the SupportsAggregatePushDown
>>>> definition
>>>> > in
>>>> > >>>> above
>>>> > >>>> > proposal.
>>>> > >>>> >
>>>> > >>>> > For (2): Agree: Change to use CallExpression is a better
>>>> choice, and
>>>> > >>>> have
>>>> > >>>> > resolved this
>>>> > >>>> > comment in the proposal.
>>>> > >>>> >
>>>> > >>>> > For (3): I suggest we first support the JDBC connector, as we
>>>> don't
>>>> > >>>> have
>>>> > >>>> > Druid connector
>>>> > >>>> > and ES connector just has sink api at present.
>>>> > >>>> >
>>>> > >>>> > But perhaps the biggest question may be whether we should use
>>>> idea 1
>>>> > >>>> or
>>>> > >>>> > idea 2 in proposal.
>>>> > >>>> >
>>>> > >>>> > What do you think?  After we reach the agreement on the
>>>> proposal,
>>>> > our
>>>> > >>>> team
>>>> > >>>> > can drive to
>>>> > >>>> > complete this feature.
>>>> > >>>> >
>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>> > >>>> >
>>>> > >>>> > > Hi Sebastian,
>>>> > >>>> > >
>>>> > >>>> > > Thanks for the proposal. I think this is a great improvement
>>>> for
>>>> > >>>> Flink
>>>> > >>>> > SQL.
>>>> > >>>> > > I went through the design doc and have following thoughts:
>>>> > >>>> > >
>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and
>>>> > proposed
>>>> > >>>> a new
>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you update your
>>>> > >>>> proposal to
>>>> > >>>> > > use the new interfaces?
>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>> > >>>> > >
>>>> > >>>> > > 2) Personally, I think CallExpression would be a better
>>>> > >>>> representation
>>>> > >>>> > than
>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it would be
>>>> > easier
>>>> > >>>> to
>>>> > >>>> > know
>>>> > >>>> > > what's the index and type of the arguments.
>>>> > >>>> > >
>>>> > >>>> > > 3) It would be better to list which connectors will be
>>>> supported
>>>> > in
>>>> > >>>> the
>>>> > >>>> > > plan?
>>>> > >>>> > >
>>>> > >>>> > > Best,
>>>> > >>>> > > Jark
>>>> > >>>> > >
>>>> > >>>> > >
>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>> > [hidden email]>
>>>> > >>>> > wrote:
>>>> > >>>> > >
>>>> > >>>> > > > Hi all,
>>>> > >>>> > > >
>>>> > >>>> > > > I'd like to discuss a new feature for the Blink Planner.
>>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully done at
>>>> Flink
>>>> > >>>> layer.
>>>> > >>>> > > > With the developing of storage, many downstream storage of
>>>> Flink
>>>> > >>>> SQL
>>>> > >>>> > has
>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>> > >>>> > > > Pushing down Aggregate to data source layer will improve
>>>> > >>>> performance
>>>> > >>>> > from
>>>> > >>>> > > > the perspective of the network IO and computation overhead.
>>>> > >>>> > > >
>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>> > >>>> > > >
>>>> > >>>> > > >
>>>> > >>>> > >
>>>> > >>>> >
>>>> > >>>>
>>>> >
>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>> > >>>> > > >
>>>> > >>>> > > > Any comment or discussion is welcome.
>>>> > >>>> > > >
>>>> > >>>> > > > --
>>>> > >>>> > > >
>>>> > >>>> > > > *With kind regards
>>>> > >>>> > > >
>>>> ------------------------------------------------------------
>>>> > >>>> > > > Sebastian Liu 刘洋
>>>> > >>>> > > > Institute of Computing Technology, Chinese Academy of
>>>> Science
>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>> > >>>> > > > E-mail: [hidden email] <[hidden email]>
>>>> > >>>> > > > QQ: 3239559*
>>>> > >>>> > > >
>>>> > >>>> > >
>>>> > >>>> >
>>>> > >>>> >
>>>> > >>>> > --
>>>> > >>>> >
>>>> > >>>> > *With kind regards
>>>> > >>>> > ------------------------------------------------------------
>>>> > >>>> > Sebastian Liu 刘洋
>>>> > >>>> > Institute of Computing Technology, Chinese Academy of Science
>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>> > >>>> > E-mail: [hidden email] <[hidden email]>
>>>> > >>>> > QQ: 3239559*
>>>> > >>>> >
>>>> > >>>>
>>>> > >>>
>>>> > >>>
>>>> > >>> --
>>>> > >>>
>>>> > >>> *With kind regards
>>>> > >>> ------------------------------------------------------------
>>>> > >>> Sebastian Liu 刘洋
>>>> > >>> Institute of Computing Technology, Chinese Academy of Science
>>>> > >>> Mobile\WeChat: +86—15201613655
>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>> > >>> QQ: 3239559*
>>>> > >>>
>>>> > >>>
>>>> >
>>>> > --
>>>> >
>>>> > *With kind regards
>>>> > ------------------------------------------------------------
>>>> > Sebastian Liu 刘洋
>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>> > Mobile\WeChat: +86—15201613655
>>>> > E-mail: [hidden email] <[hidden email]>
>>>> > QQ: 3239559*
>>>> >
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>>
>>> --
>>>
>>> *With kind regards
>>> ------------------------------------------------------------
>>> Sebastian Liu 刘洋
>>> Institute of Computing Technology, Chinese Academy of Science
>>> Mobile\WeChat: +86—15201613655
>>> E-mail: [hidden email] <[hidden email]>
>>> QQ: 3239559*
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jingsong Li
Hi,

I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d >
1)). Can we push it down? I'm not sure that a single call expression can
express it, and how we should embody it and convey it to users.

Best,
Jingsong

On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <[hidden email]> wrote:

> Hi Jark,
>
> I don't want to limit this interface to LocalAgg Push down. Actually,
> sometimes, we can push whole aggregation to source too.
>
> So, this rule can do something more advanced. For example, we can push
> down group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1,
> f2)". Then, we need to add more information to push down.
>
> Best,
> Jingsong
>
> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]> wrote:
>
>> I think this may be over designed. We should have confidence in the
>> interface we design, the interface should be stable.
>> Wrapping things in a big context has a cost of losing user convenience.
>> Foremost, we don't see any parameters to add in the future. Do you know
>> any potential parameters?
>>
>> Best,
>> Jark
>>
>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <[hidden email]> wrote:
>>
>>> Hi Sebastian,
>>>
>>> Well, I mean:
>>>
>>> `boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>> aggregateExpressions, DataType producedDataType);`
>>> VS
>>> ```
>>> boolean applyAggregates(Aggregation agg);
>>>
>>> interface Aggregation {
>>>   int[] groupingFields();
>>>   List<CallExpression> aggregateExpressions();
>>>   DataType producedDataType();
>>> }
>>> ```
>>>
>>> Maybe I've over considered it, but I think Aggregation is a complicated
>>> thing. Maybe we need to extend its parameters in the future, so make the
>>> parameters interface, which is conducive to the future expansion without
>>> destroying the compatibility of user implementation. If it is the way
>>> before, users need to modify the code.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <[hidden email]>
>>> wrote:
>>>
>>>> Hi Jinsong,
>>>>
>>>> Thx a lot for your suggestion. These points really need to be clear in
>>>> the proposal.
>>>>
>>>> For the semantic problem, I think the main point is the different
>>>> returned data types
>>>> for the target aggregate function and the row format returned by the
>>>> underlying storage.
>>>> That's why we provide the producedDataType in the
>>>> SupportsAggregatePushDown interface.
>>>> Need to let developers know that we need to handle the semantic
>>>> differences between
>>>> the underlying storage system and Flink in related connectors.
>>>> [Supplemented in proposal]
>>>>
>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule, it's also
>>>> a key point.
>>>> As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
>>>> and better to put it
>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>
>>>> For the scalability of the interface, actually I don't exactly
>>>> understand your suggestion. Is it to add
>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>> interface, and holds the
>>>> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
>>>> DataType producedDataType`
>>>> fields?
>>>>
>>>> Looking forward to your further feedback or guidance.
>>>>
>>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>>
>>>>> Thanks for your proposal! Sebastian.
>>>>>
>>>>> +1 for SupportsAggregatePushDown. The above wonderful discussion has
>>>>> solved
>>>>> many of my concerns.
>>>>>
>>>>> ## Semantic problems
>>>>>
>>>>> We may need to add some mechanisms or comments, because as far as I
>>>>> know,
>>>>> the semantics of each database is actually different, which may need
>>>>> to be
>>>>> reflected in your specific implementation.
>>>>>
>>>>> For example, the AVG output types of various databases may be
>>>>> different.
>>>>> For example, MySQL outputs double, this is different from Flink. What
>>>>> should we do? (Lucky, avg will be splitted into sum and count, But we
>>>>> also
>>>>> need care about decimal and others)
>>>>>
>>>>> ## The phase of push-down rule
>>>>>
>>>>> I strongly recommend that you do not put it in the Volcano phase,
>>>>> which may
>>>>> make the cost calculation very troublesome.
>>>>> So in PHYSICAL_REWRITE?
>>>>>
>>>>> ## About interface
>>>>>
>>>>> For scalability, I slightly recommend that we introduce an `Aggregate`
>>>>> interface, it contains `List<CallExpression> aggregateExpressions,
>>>>> int[]
>>>>> groupingFields, DataType producedDataType` fields. In this way, we can
>>>>> add
>>>>> fields easily without breaking compatibility.
>>>>>
>>>>> I think the current design is very good, just put forward some ideas.
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <[hidden email]>
>>>>> wrote:
>>>>>
>>>>> > Hi Jark,
>>>>> >
>>>>> > Thx for your further feedback and help. The interface of
>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>> >
>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>> TableSource can
>>>>> > handle all of the aggregates.
>>>>> > It's better to just return a boolean type to indicate whether all of
>>>>> > aggregates push down was successful or not. [Resolved in proposal]
>>>>> >
>>>>> > For (2) Agree: The aggOutputDataType represent the produced data
>>>>> type of
>>>>> > the new table source to make sure that the new table source can
>>>>> > connect with the related exchange node. The format of this
>>>>> > aggOutputDataType is groupedFields's type + agg function's return
>>>>> type.
>>>>> > The reason for adding this parameter in this function is also to
>>>>> facilitate
>>>>> > the user to build the final output type. I have changed this
>>>>> parameter
>>>>> > to be producedDataType. [Resolved in proposal]
>>>>> >
>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have changed to
>>>>> use
>>>>> > groupingFields. [Resolved in proposal]
>>>>> >
>>>>> > Thx again for the suggestion, looking for the further discussion.
>>>>> >
>>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>>> >
>>>>> > > I'm also +1 for idea#2.
>>>>> > >
>>>>> > > Regarding to the updated interface,
>>>>> > >
>>>>> > > Result applyAggregates(List<CallExpression> aggregateExpressions,
>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>> > >
>>>>> > > final class Result {
>>>>> > >        private final List<CallExpression> acceptedAggregates;
>>>>> > >        private final List<CallExpression> remainingAggregates;
>>>>> > > }
>>>>> > >
>>>>> > > I have following comments:
>>>>> > >
>>>>> > > 1) Do we need the composite Result return type? Is a boolean
>>>>> return type
>>>>> > > enough?
>>>>> > >     From my understanding, all of the aggregates should be
>>>>> accepted,
>>>>> > > otherwise the pushdown should fail.
>>>>> > >     Therefore, users don't need to distinguish which aggregates are
>>>>> > > "accepted".
>>>>> > >
>>>>> > > 2) Does the `aggOutputDataType` represent the produced data type
>>>>> of the
>>>>> > > new source, or just the return type of all the agg functions?
>>>>> > >     I would prefer to `producedDataType` just like
>>>>> > > `SupportsReadingMetadata` to reduce the effort for users to concat
>>>>> a
>>>>> > final
>>>>> > > output type.
>>>>> > >     The return type of each agg function can be obtained from the
>>>>> > > `CallExpression`.
>>>>> > >
>>>>> > > 3) What do you think about renaming `groupSet` to `grouping` or
>>>>> > > `groupedFields` ?
>>>>> > >     The `groupSet` may confuse users that it relates to "grouping
>>>>> sets".
>>>>> > >
>>>>> > >
>>>>> > > What do you think?
>>>>> > >
>>>>> > > Best,
>>>>> > > Jark
>>>>> > >
>>>>> > >
>>>>> > >
>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]> wrote:
>>>>> > >
>>>>> > >> Sorry for the typo -_-!
>>>>> > >> I meant idea #2.
>>>>> > >>
>>>>> > >> Best,
>>>>> > >> Kurt
>>>>> > >>
>>>>> > >>
>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>> [hidden email]>
>>>>> > >> wrote:
>>>>> > >>
>>>>> > >>> Hi Kurt,
>>>>> > >>>
>>>>> > >>> Thx a lot for your feedback. If local aggregation is more like a
>>>>> > >>> physical operator rather than logical
>>>>> > >>> operator, I think your suggestion should be idea #2 which handle
>>>>> all in
>>>>> > >>> the physical optimization phase?
>>>>> > >>>
>>>>> > >>> Looking forward for the further discussion.
>>>>> > >>>
>>>>> > >>>
>>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>>> > >>>
>>>>> > >>>> Local aggregation is more like a physical operator rather than
>>>>> logical
>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>> > >>>>
>>>>> > >>>> Best,
>>>>> > >>>> Kurt
>>>>> > >>>>
>>>>> > >>>>
>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>> [hidden email]>
>>>>> > >>>> wrote:
>>>>> > >>>>
>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>> suggestions.
>>>>> > >>>> > For (1): Agree: Since we are in the period of upgrading the
>>>>> new
>>>>> > table
>>>>> > >>>> > source api,
>>>>> > >>>> > we really should consider the new interface for the new
>>>>> optimize
>>>>> > >>>> rule. If
>>>>> > >>>> > the new rule
>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it sooner or
>>>>> later. I
>>>>> > >>>> have
>>>>> > >>>> > change to use
>>>>> > >>>> > the ability interface for the SupportsAggregatePushDown
>>>>> definition
>>>>> > in
>>>>> > >>>> above
>>>>> > >>>> > proposal.
>>>>> > >>>> >
>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a better
>>>>> choice, and
>>>>> > >>>> have
>>>>> > >>>> > resolved this
>>>>> > >>>> > comment in the proposal.
>>>>> > >>>> >
>>>>> > >>>> > For (3): I suggest we first support the JDBC connector, as we
>>>>> don't
>>>>> > >>>> have
>>>>> > >>>> > Druid connector
>>>>> > >>>> > and ES connector just has sink api at present.
>>>>> > >>>> >
>>>>> > >>>> > But perhaps the biggest question may be whether we should use
>>>>> idea 1
>>>>> > >>>> or
>>>>> > >>>> > idea 2 in proposal.
>>>>> > >>>> >
>>>>> > >>>> > What do you think?  After we reach the agreement on the
>>>>> proposal,
>>>>> > our
>>>>> > >>>> team
>>>>> > >>>> > can drive to
>>>>> > >>>> > complete this feature.
>>>>> > >>>> >
>>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>>> > >>>> >
>>>>> > >>>> > > Hi Sebastian,
>>>>> > >>>> > >
>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>> improvement for
>>>>> > >>>> Flink
>>>>> > >>>> > SQL.
>>>>> > >>>> > > I went through the design doc and have following thoughts:
>>>>> > >>>> > >
>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and
>>>>> > proposed
>>>>> > >>>> a new
>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you update your
>>>>> > >>>> proposal to
>>>>> > >>>> > > use the new interfaces?
>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>> > >>>> > >
>>>>> > >>>> > > 2) Personally, I think CallExpression would be a better
>>>>> > >>>> representation
>>>>> > >>>> > than
>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it would be
>>>>> > easier
>>>>> > >>>> to
>>>>> > >>>> > know
>>>>> > >>>> > > what's the index and type of the arguments.
>>>>> > >>>> > >
>>>>> > >>>> > > 3) It would be better to list which connectors will be
>>>>> supported
>>>>> > in
>>>>> > >>>> the
>>>>> > >>>> > > plan?
>>>>> > >>>> > >
>>>>> > >>>> > > Best,
>>>>> > >>>> > > Jark
>>>>> > >>>> > >
>>>>> > >>>> > >
>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>> > [hidden email]>
>>>>> > >>>> > wrote:
>>>>> > >>>> > >
>>>>> > >>>> > > > Hi all,
>>>>> > >>>> > > >
>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink Planner.
>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully done
>>>>> at Flink
>>>>> > >>>> layer.
>>>>> > >>>> > > > With the developing of storage, many downstream storage
>>>>> of Flink
>>>>> > >>>> SQL
>>>>> > >>>> > has
>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>> > >>>> > > > Pushing down Aggregate to data source layer will improve
>>>>> > >>>> performance
>>>>> > >>>> > from
>>>>> > >>>> > > > the perspective of the network IO and computation
>>>>> overhead.
>>>>> > >>>> > > >
>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>> > >>>> > > >
>>>>> > >>>> > > >
>>>>> > >>>> > >
>>>>> > >>>> >
>>>>> > >>>>
>>>>> >
>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>> > >>>> > > >
>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>> > >>>> > > >
>>>>> > >>>> > > > --
>>>>> > >>>> > > >
>>>>> > >>>> > > > *With kind regards
>>>>> > >>>> > > >
>>>>> ------------------------------------------------------------
>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>> > >>>> > > > Institute of Computing Technology, Chinese Academy of
>>>>> Science
>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>> > >>>> > > > E-mail: [hidden email] <[hidden email]>
>>>>> > >>>> > > > QQ: 3239559*
>>>>> > >>>> > > >
>>>>> > >>>> > >
>>>>> > >>>> >
>>>>> > >>>> >
>>>>> > >>>> > --
>>>>> > >>>> >
>>>>> > >>>> > *With kind regards
>>>>> > >>>> > ------------------------------------------------------------
>>>>> > >>>> > Sebastian Liu 刘洋
>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>> > >>>> > E-mail: [hidden email] <[hidden email]>
>>>>> > >>>> > QQ: 3239559*
>>>>> > >>>> >
>>>>> > >>>>
>>>>> > >>>
>>>>> > >>>
>>>>> > >>> --
>>>>> > >>>
>>>>> > >>> *With kind regards
>>>>> > >>> ------------------------------------------------------------
>>>>> > >>> Sebastian Liu 刘洋
>>>>> > >>> Institute of Computing Technology, Chinese Academy of Science
>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>>> > >>> QQ: 3239559*
>>>>> > >>>
>>>>> > >>>
>>>>> >
>>>>> > --
>>>>> >
>>>>> > *With kind regards
>>>>> > ------------------------------------------------------------
>>>>> > Sebastian Liu 刘洋
>>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>> > Mobile\WeChat: +86—15201613655
>>>>> > E-mail: [hidden email] <[hidden email]>
>>>>> > QQ: 3239559*
>>>>> >
>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *With kind regards
>>>> ------------------------------------------------------------
>>>> Sebastian Liu 刘洋
>>>> Institute of Computing Technology, Chinese Academy of Science
>>>> Mobile\WeChat: +86—15201613655
>>>> E-mail: [hidden email] <[hidden email]>
>>>> QQ: 3239559*
>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jark Wu-2
I think filter expressions and grouping sets are semantic arguments instead
of utilities.
If we want to push them into sources, the connector developers should be
aware of them.
Wrapping them in a context implicitly is error-prone that the existing
connector will produce wrong results
 when upgrading to new Flink versions (as we are pushing
grouping_sets/filter_args, but connector ignores it).
I think for these cases, providing a new default method to override might
be a better choice.

Best,
Jark

On Wed, 6 Jan 2021 at 13:56, Jingsong Li <[hidden email]> wrote:

> Hi,
>
> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d >
> 1)). Can we push it down? I'm not sure that a single call expression can
> express it, and how we should embody it and convey it to users.
>
> Best,
> Jingsong
>
> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <[hidden email]> wrote:
>
>> Hi Jark,
>>
>> I don't want to limit this interface to LocalAgg Push down. Actually,
>> sometimes, we can push whole aggregation to source too.
>>
>> So, this rule can do something more advanced. For example, we can push
>> down group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1,
>> f2)". Then, we need to add more information to push down.
>>
>> Best,
>> Jingsong
>>
>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]> wrote:
>>
>>> I think this may be over designed. We should have confidence in the
>>> interface we design, the interface should be stable.
>>> Wrapping things in a big context has a cost of losing user convenience.
>>> Foremost, we don't see any parameters to add in the future. Do you know
>>> any potential parameters?
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <[hidden email]> wrote:
>>>
>>>> Hi Sebastian,
>>>>
>>>> Well, I mean:
>>>>
>>>> `boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>> aggregateExpressions, DataType producedDataType);`
>>>> VS
>>>> ```
>>>> boolean applyAggregates(Aggregation agg);
>>>>
>>>> interface Aggregation {
>>>>   int[] groupingFields();
>>>>   List<CallExpression> aggregateExpressions();
>>>>   DataType producedDataType();
>>>> }
>>>> ```
>>>>
>>>> Maybe I've over considered it, but I think Aggregation is a complicated
>>>> thing. Maybe we need to extend its parameters in the future, so make the
>>>> parameters interface, which is conducive to the future expansion without
>>>> destroying the compatibility of user implementation. If it is the way
>>>> before, users need to modify the code.
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi Jinsong,
>>>>>
>>>>> Thx a lot for your suggestion. These points really need to be clear in
>>>>> the proposal.
>>>>>
>>>>> For the semantic problem, I think the main point is the different
>>>>> returned data types
>>>>> for the target aggregate function and the row format returned by the
>>>>> underlying storage.
>>>>> That's why we provide the producedDataType in the
>>>>> SupportsAggregatePushDown interface.
>>>>> Need to let developers know that we need to handle the semantic
>>>>> differences between
>>>>> the underlying storage system and Flink in related connectors.
>>>>> [Supplemented in proposal]
>>>>>
>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule,
>>>>> it's also a key point.
>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
>>>>> and better to put it
>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>
>>>>> For the scalability of the interface, actually I don't exactly
>>>>> understand your suggestion. Is it to add
>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>> interface, and holds the
>>>>> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
>>>>> DataType producedDataType`
>>>>> fields?
>>>>>
>>>>> Looking forward to your further feedback or guidance.
>>>>>
>>>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>>>
>>>>>> Thanks for your proposal! Sebastian.
>>>>>>
>>>>>> +1 for SupportsAggregatePushDown. The above wonderful discussion has
>>>>>> solved
>>>>>> many of my concerns.
>>>>>>
>>>>>> ## Semantic problems
>>>>>>
>>>>>> We may need to add some mechanisms or comments, because as far as I
>>>>>> know,
>>>>>> the semantics of each database is actually different, which may need
>>>>>> to be
>>>>>> reflected in your specific implementation.
>>>>>>
>>>>>> For example, the AVG output types of various databases may be
>>>>>> different.
>>>>>> For example, MySQL outputs double, this is different from Flink. What
>>>>>> should we do? (Lucky, avg will be splitted into sum and count, But we
>>>>>> also
>>>>>> need care about decimal and others)
>>>>>>
>>>>>> ## The phase of push-down rule
>>>>>>
>>>>>> I strongly recommend that you do not put it in the Volcano phase,
>>>>>> which may
>>>>>> make the cost calculation very troublesome.
>>>>>> So in PHYSICAL_REWRITE?
>>>>>>
>>>>>> ## About interface
>>>>>>
>>>>>> For scalability, I slightly recommend that we introduce an `Aggregate`
>>>>>> interface, it contains `List<CallExpression> aggregateExpressions,
>>>>>> int[]
>>>>>> groupingFields, DataType producedDataType` fields. In this way, we
>>>>>> can add
>>>>>> fields easily without breaking compatibility.
>>>>>>
>>>>>> I think the current design is very good, just put forward some ideas.
>>>>>>
>>>>>> Best,
>>>>>> Jingsong
>>>>>>
>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi Jark,
>>>>>> >
>>>>>> > Thx for your further feedback and help. The interface of
>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>> >
>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>> TableSource can
>>>>>> > handle all of the aggregates.
>>>>>> > It's better to just return a boolean type to indicate whether all of
>>>>>> > aggregates push down was successful or not. [Resolved in proposal]
>>>>>> >
>>>>>> > For (2) Agree: The aggOutputDataType represent the produced data
>>>>>> type of
>>>>>> > the new table source to make sure that the new table source can
>>>>>> > connect with the related exchange node. The format of this
>>>>>> > aggOutputDataType is groupedFields's type + agg function's return
>>>>>> type.
>>>>>> > The reason for adding this parameter in this function is also to
>>>>>> facilitate
>>>>>> > the user to build the final output type. I have changed this
>>>>>> parameter
>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>> >
>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have changed
>>>>>> to use
>>>>>> > groupingFields. [Resolved in proposal]
>>>>>> >
>>>>>> > Thx again for the suggestion, looking for the further discussion.
>>>>>> >
>>>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>>>> >
>>>>>> > > I'm also +1 for idea#2.
>>>>>> > >
>>>>>> > > Regarding to the updated interface,
>>>>>> > >
>>>>>> > > Result applyAggregates(List<CallExpression> aggregateExpressions,
>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>> > >
>>>>>> > > final class Result {
>>>>>> > >        private final List<CallExpression> acceptedAggregates;
>>>>>> > >        private final List<CallExpression> remainingAggregates;
>>>>>> > > }
>>>>>> > >
>>>>>> > > I have following comments:
>>>>>> > >
>>>>>> > > 1) Do we need the composite Result return type? Is a boolean
>>>>>> return type
>>>>>> > > enough?
>>>>>> > >     From my understanding, all of the aggregates should be
>>>>>> accepted,
>>>>>> > > otherwise the pushdown should fail.
>>>>>> > >     Therefore, users don't need to distinguish which aggregates
>>>>>> are
>>>>>> > > "accepted".
>>>>>> > >
>>>>>> > > 2) Does the `aggOutputDataType` represent the produced data type
>>>>>> of the
>>>>>> > > new source, or just the return type of all the agg functions?
>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users to
>>>>>> concat a
>>>>>> > final
>>>>>> > > output type.
>>>>>> > >     The return type of each agg function can be obtained from the
>>>>>> > > `CallExpression`.
>>>>>> > >
>>>>>> > > 3) What do you think about renaming `groupSet` to `grouping` or
>>>>>> > > `groupedFields` ?
>>>>>> > >     The `groupSet` may confuse users that it relates to "grouping
>>>>>> sets".
>>>>>> > >
>>>>>> > >
>>>>>> > > What do you think?
>>>>>> > >
>>>>>> > > Best,
>>>>>> > > Jark
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]> wrote:
>>>>>> > >
>>>>>> > >> Sorry for the typo -_-!
>>>>>> > >> I meant idea #2.
>>>>>> > >>
>>>>>> > >> Best,
>>>>>> > >> Kurt
>>>>>> > >>
>>>>>> > >>
>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>> [hidden email]>
>>>>>> > >> wrote:
>>>>>> > >>
>>>>>> > >>> Hi Kurt,
>>>>>> > >>>
>>>>>> > >>> Thx a lot for your feedback. If local aggregation is more like a
>>>>>> > >>> physical operator rather than logical
>>>>>> > >>> operator, I think your suggestion should be idea #2 which
>>>>>> handle all in
>>>>>> > >>> the physical optimization phase?
>>>>>> > >>>
>>>>>> > >>> Looking forward for the further discussion.
>>>>>> > >>>
>>>>>> > >>>
>>>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>>>> > >>>
>>>>>> > >>>> Local aggregation is more like a physical operator rather than
>>>>>> logical
>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>> > >>>>
>>>>>> > >>>> Best,
>>>>>> > >>>> Kurt
>>>>>> > >>>>
>>>>>> > >>>>
>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>> [hidden email]>
>>>>>> > >>>> wrote:
>>>>>> > >>>>
>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>> suggestions.
>>>>>> > >>>> > For (1): Agree: Since we are in the period of upgrading the
>>>>>> new
>>>>>> > table
>>>>>> > >>>> > source api,
>>>>>> > >>>> > we really should consider the new interface for the new
>>>>>> optimize
>>>>>> > >>>> rule. If
>>>>>> > >>>> > the new rule
>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it sooner or
>>>>>> later. I
>>>>>> > >>>> have
>>>>>> > >>>> > change to use
>>>>>> > >>>> > the ability interface for the SupportsAggregatePushDown
>>>>>> definition
>>>>>> > in
>>>>>> > >>>> above
>>>>>> > >>>> > proposal.
>>>>>> > >>>> >
>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a better
>>>>>> choice, and
>>>>>> > >>>> have
>>>>>> > >>>> > resolved this
>>>>>> > >>>> > comment in the proposal.
>>>>>> > >>>> >
>>>>>> > >>>> > For (3): I suggest we first support the JDBC connector, as
>>>>>> we don't
>>>>>> > >>>> have
>>>>>> > >>>> > Druid connector
>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>> > >>>> >
>>>>>> > >>>> > But perhaps the biggest question may be whether we should
>>>>>> use idea 1
>>>>>> > >>>> or
>>>>>> > >>>> > idea 2 in proposal.
>>>>>> > >>>> >
>>>>>> > >>>> > What do you think?  After we reach the agreement on the
>>>>>> proposal,
>>>>>> > our
>>>>>> > >>>> team
>>>>>> > >>>> > can drive to
>>>>>> > >>>> > complete this feature.
>>>>>> > >>>> >
>>>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>>>> > >>>> >
>>>>>> > >>>> > > Hi Sebastian,
>>>>>> > >>>> > >
>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>> improvement for
>>>>>> > >>>> Flink
>>>>>> > >>>> > SQL.
>>>>>> > >>>> > > I went through the design doc and have following thoughts:
>>>>>> > >>>> > >
>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and
>>>>>> > proposed
>>>>>> > >>>> a new
>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you update
>>>>>> your
>>>>>> > >>>> proposal to
>>>>>> > >>>> > > use the new interfaces?
>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>> > >>>> > >
>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a better
>>>>>> > >>>> representation
>>>>>> > >>>> > than
>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it would
>>>>>> be
>>>>>> > easier
>>>>>> > >>>> to
>>>>>> > >>>> > know
>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>> > >>>> > >
>>>>>> > >>>> > > 3) It would be better to list which connectors will be
>>>>>> supported
>>>>>> > in
>>>>>> > >>>> the
>>>>>> > >>>> > > plan?
>>>>>> > >>>> > >
>>>>>> > >>>> > > Best,
>>>>>> > >>>> > > Jark
>>>>>> > >>>> > >
>>>>>> > >>>> > >
>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>> > [hidden email]>
>>>>>> > >>>> > wrote:
>>>>>> > >>>> > >
>>>>>> > >>>> > > > Hi all,
>>>>>> > >>>> > > >
>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink Planner.
>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully done
>>>>>> at Flink
>>>>>> > >>>> layer.
>>>>>> > >>>> > > > With the developing of storage, many downstream storage
>>>>>> of Flink
>>>>>> > >>>> SQL
>>>>>> > >>>> > has
>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will improve
>>>>>> > >>>> performance
>>>>>> > >>>> > from
>>>>>> > >>>> > > > the perspective of the network IO and computation
>>>>>> overhead.
>>>>>> > >>>> > > >
>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>> > >>>> > > >
>>>>>> > >>>> > > >
>>>>>> > >>>> > >
>>>>>> > >>>> >
>>>>>> > >>>>
>>>>>> >
>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>> > >>>> > > >
>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>> > >>>> > > >
>>>>>> > >>>> > > > --
>>>>>> > >>>> > > >
>>>>>> > >>>> > > > *With kind regards
>>>>>> > >>>> > > >
>>>>>> ------------------------------------------------------------
>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>> > >>>> > > > Institute of Computing Technology, Chinese Academy of
>>>>>> Science
>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>> > >>>> > > > E-mail: [hidden email] <[hidden email]>
>>>>>> > >>>> > > > QQ: 3239559*
>>>>>> > >>>> > > >
>>>>>> > >>>> > >
>>>>>> > >>>> >
>>>>>> > >>>> >
>>>>>> > >>>> > --
>>>>>> > >>>> >
>>>>>> > >>>> > *With kind regards
>>>>>> > >>>> > ------------------------------------------------------------
>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>> > >>>> > E-mail: [hidden email] <[hidden email]>
>>>>>> > >>>> > QQ: 3239559*
>>>>>> > >>>> >
>>>>>> > >>>>
>>>>>> > >>>
>>>>>> > >>>
>>>>>> > >>> --
>>>>>> > >>>
>>>>>> > >>> *With kind regards
>>>>>> > >>> ------------------------------------------------------------
>>>>>> > >>> Sebastian Liu 刘洋
>>>>>> > >>> Institute of Computing Technology, Chinese Academy of Science
>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>>>> > >>> QQ: 3239559*
>>>>>> > >>>
>>>>>> > >>>
>>>>>> >
>>>>>> > --
>>>>>> >
>>>>>> > *With kind regards
>>>>>> > ------------------------------------------------------------
>>>>>> > Sebastian Liu 刘洋
>>>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>> > E-mail: [hidden email] <[hidden email]>
>>>>>> > QQ: 3239559*
>>>>>> >
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *With kind regards
>>>>> ------------------------------------------------------------
>>>>> Sebastian Liu 刘洋
>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>> Mobile\WeChat: +86—15201613655
>>>>> E-mail: [hidden email] <[hidden email]>
>>>>> QQ: 3239559*
>>>>>
>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jingsong Li
> I think filter expressions and grouping sets are semantic arguments
instead of utilities. If we want to push them into sources, the connector
developers should be aware of them.Wrapping them in a context implicitly is
error-prone that the existing connector will produce wrong results when
upgrading to new Flink versions.

We can have some mechanism to check the upgrading.

> I think for these cases, providing a new default method to override might
be a better choice.

Then we will have three or more methods. For the API level, I really don't
like it...

Best,
Jingsong

On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <[hidden email]> wrote:

> I think filter expressions and grouping sets are semantic arguments
> instead of utilities.
> If we want to push them into sources, the connector developers should be
> aware of them.
> Wrapping them in a context implicitly is error-prone that the existing
> connector will produce wrong results
>  when upgrading to new Flink versions (as we are pushing
> grouping_sets/filter_args, but connector ignores it).
> I think for these cases, providing a new default method to override might
> be a better choice.
>
> Best,
> Jark
>
> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <[hidden email]> wrote:
>
>> Hi,
>>
>> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d >
>> 1)). Can we push it down? I'm not sure that a single call expression can
>> express it, and how we should embody it and convey it to users.
>>
>> Best,
>> Jingsong
>>
>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <[hidden email]>
>> wrote:
>>
>>> Hi Jark,
>>>
>>> I don't want to limit this interface to LocalAgg Push down. Actually,
>>> sometimes, we can push whole aggregation to source too.
>>>
>>> So, this rule can do something more advanced. For example, we can push
>>> down group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1,
>>> f2)". Then, we need to add more information to push down.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]> wrote:
>>>
>>>> I think this may be over designed. We should have confidence in the
>>>> interface we design, the interface should be stable.
>>>> Wrapping things in a big context has a cost of losing user convenience.
>>>> Foremost, we don't see any parameters to add in the future. Do you know
>>>> any potential parameters?
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi Sebastian,
>>>>>
>>>>> Well, I mean:
>>>>>
>>>>> `boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>> aggregateExpressions, DataType producedDataType);`
>>>>> VS
>>>>> ```
>>>>> boolean applyAggregates(Aggregation agg);
>>>>>
>>>>> interface Aggregation {
>>>>>   int[] groupingFields();
>>>>>   List<CallExpression> aggregateExpressions();
>>>>>   DataType producedDataType();
>>>>> }
>>>>> ```
>>>>>
>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>> complicated thing. Maybe we need to extend its parameters in the future, so
>>>>> make the parameters interface, which is conducive to the future expansion
>>>>> without destroying the compatibility of user implementation. If it is the
>>>>> way before, users need to modify the code.
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi Jinsong,
>>>>>>
>>>>>> Thx a lot for your suggestion. These points really need to be clear
>>>>>> in the proposal.
>>>>>>
>>>>>> For the semantic problem, I think the main point is the different
>>>>>> returned data types
>>>>>> for the target aggregate function and the row format returned by the
>>>>>> underlying storage.
>>>>>> That's why we provide the producedDataType in the
>>>>>> SupportsAggregatePushDown interface.
>>>>>> Need to let developers know that we need to handle the semantic
>>>>>> differences between
>>>>>> the underlying storage system and Flink in related connectors.
>>>>>> [Supplemented in proposal]
>>>>>>
>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule,
>>>>>> it's also a key point.
>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE rule
>>>>>> set, and better to put it
>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>
>>>>>> For the scalability of the interface, actually I don't exactly
>>>>>> understand your suggestion. Is it to add
>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>> interface, and holds the
>>>>>> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
>>>>>> DataType producedDataType`
>>>>>> fields?
>>>>>>
>>>>>> Looking forward to your further feedback or guidance.
>>>>>>
>>>>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>>>>
>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>
>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful discussion has
>>>>>>> solved
>>>>>>> many of my concerns.
>>>>>>>
>>>>>>> ## Semantic problems
>>>>>>>
>>>>>>> We may need to add some mechanisms or comments, because as far as I
>>>>>>> know,
>>>>>>> the semantics of each database is actually different, which may need
>>>>>>> to be
>>>>>>> reflected in your specific implementation.
>>>>>>>
>>>>>>> For example, the AVG output types of various databases may be
>>>>>>> different.
>>>>>>> For example, MySQL outputs double, this is different from Flink. What
>>>>>>> should we do? (Lucky, avg will be splitted into sum and count, But
>>>>>>> we also
>>>>>>> need care about decimal and others)
>>>>>>>
>>>>>>> ## The phase of push-down rule
>>>>>>>
>>>>>>> I strongly recommend that you do not put it in the Volcano phase,
>>>>>>> which may
>>>>>>> make the cost calculation very troublesome.
>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>
>>>>>>> ## About interface
>>>>>>>
>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>> `Aggregate`
>>>>>>> interface, it contains `List<CallExpression> aggregateExpressions,
>>>>>>> int[]
>>>>>>> groupingFields, DataType producedDataType` fields. In this way, we
>>>>>>> can add
>>>>>>> fields easily without breaking compatibility.
>>>>>>>
>>>>>>> I think the current design is very good, just put forward some ideas.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong
>>>>>>>
>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> > Hi Jark,
>>>>>>> >
>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>> >
>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>> TableSource can
>>>>>>> > handle all of the aggregates.
>>>>>>> > It's better to just return a boolean type to indicate whether all
>>>>>>> of
>>>>>>> > aggregates push down was successful or not. [Resolved in proposal]
>>>>>>> >
>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced data
>>>>>>> type of
>>>>>>> > the new table source to make sure that the new table source can
>>>>>>> > connect with the related exchange node. The format of this
>>>>>>> > aggOutputDataType is groupedFields's type + agg function's return
>>>>>>> type.
>>>>>>> > The reason for adding this parameter in this function is also to
>>>>>>> facilitate
>>>>>>> > the user to build the final output type. I have changed this
>>>>>>> parameter
>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>> >
>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have changed
>>>>>>> to use
>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>> >
>>>>>>> > Thx again for the suggestion, looking for the further discussion.
>>>>>>> >
>>>>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>>>>> >
>>>>>>> > > I'm also +1 for idea#2.
>>>>>>> > >
>>>>>>> > > Regarding to the updated interface,
>>>>>>> > >
>>>>>>> > > Result applyAggregates(List<CallExpression> aggregateExpressions,
>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>> > >
>>>>>>> > > final class Result {
>>>>>>> > >        private final List<CallExpression> acceptedAggregates;
>>>>>>> > >        private final List<CallExpression> remainingAggregates;
>>>>>>> > > }
>>>>>>> > >
>>>>>>> > > I have following comments:
>>>>>>> > >
>>>>>>> > > 1) Do we need the composite Result return type? Is a boolean
>>>>>>> return type
>>>>>>> > > enough?
>>>>>>> > >     From my understanding, all of the aggregates should be
>>>>>>> accepted,
>>>>>>> > > otherwise the pushdown should fail.
>>>>>>> > >     Therefore, users don't need to distinguish which aggregates
>>>>>>> are
>>>>>>> > > "accepted".
>>>>>>> > >
>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced data type
>>>>>>> of the
>>>>>>> > > new source, or just the return type of all the agg functions?
>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users to
>>>>>>> concat a
>>>>>>> > final
>>>>>>> > > output type.
>>>>>>> > >     The return type of each agg function can be obtained from the
>>>>>>> > > `CallExpression`.
>>>>>>> > >
>>>>>>> > > 3) What do you think about renaming `groupSet` to `grouping` or
>>>>>>> > > `groupedFields` ?
>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>> "grouping sets".
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > What do you think?
>>>>>>> > >
>>>>>>> > > Best,
>>>>>>> > > Jark
>>>>>>> > >
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]>
>>>>>>> wrote:
>>>>>>> > >
>>>>>>> > >> Sorry for the typo -_-!
>>>>>>> > >> I meant idea #2.
>>>>>>> > >>
>>>>>>> > >> Best,
>>>>>>> > >> Kurt
>>>>>>> > >>
>>>>>>> > >>
>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>> [hidden email]>
>>>>>>> > >> wrote:
>>>>>>> > >>
>>>>>>> > >>> Hi Kurt,
>>>>>>> > >>>
>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is more like
>>>>>>> a
>>>>>>> > >>> physical operator rather than logical
>>>>>>> > >>> operator, I think your suggestion should be idea #2 which
>>>>>>> handle all in
>>>>>>> > >>> the physical optimization phase?
>>>>>>> > >>>
>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>> > >>>
>>>>>>> > >>>
>>>>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>>>>> > >>>
>>>>>>> > >>>> Local aggregation is more like a physical operator rather
>>>>>>> than logical
>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>> > >>>>
>>>>>>> > >>>> Best,
>>>>>>> > >>>> Kurt
>>>>>>> > >>>>
>>>>>>> > >>>>
>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>> [hidden email]>
>>>>>>> > >>>> wrote:
>>>>>>> > >>>>
>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>>> suggestions.
>>>>>>> > >>>> > For (1): Agree: Since we are in the period of upgrading the
>>>>>>> new
>>>>>>> > table
>>>>>>> > >>>> > source api,
>>>>>>> > >>>> > we really should consider the new interface for the new
>>>>>>> optimize
>>>>>>> > >>>> rule. If
>>>>>>> > >>>> > the new rule
>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it sooner or
>>>>>>> later. I
>>>>>>> > >>>> have
>>>>>>> > >>>> > change to use
>>>>>>> > >>>> > the ability interface for the SupportsAggregatePushDown
>>>>>>> definition
>>>>>>> > in
>>>>>>> > >>>> above
>>>>>>> > >>>> > proposal.
>>>>>>> > >>>> >
>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a better
>>>>>>> choice, and
>>>>>>> > >>>> have
>>>>>>> > >>>> > resolved this
>>>>>>> > >>>> > comment in the proposal.
>>>>>>> > >>>> >
>>>>>>> > >>>> > For (3): I suggest we first support the JDBC connector, as
>>>>>>> we don't
>>>>>>> > >>>> have
>>>>>>> > >>>> > Druid connector
>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>> > >>>> >
>>>>>>> > >>>> > But perhaps the biggest question may be whether we should
>>>>>>> use idea 1
>>>>>>> > >>>> or
>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>> > >>>> >
>>>>>>> > >>>> > What do you think?  After we reach the agreement on the
>>>>>>> proposal,
>>>>>>> > our
>>>>>>> > >>>> team
>>>>>>> > >>>> > can drive to
>>>>>>> > >>>> > complete this feature.
>>>>>>> > >>>> >
>>>>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>>>>> > >>>> >
>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>> improvement for
>>>>>>> > >>>> Flink
>>>>>>> > >>>> > SQL.
>>>>>>> > >>>> > > I went through the design doc and have following thoughts:
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and
>>>>>>> > proposed
>>>>>>> > >>>> a new
>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you update
>>>>>>> your
>>>>>>> > >>>> proposal to
>>>>>>> > >>>> > > use the new interfaces?
>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a better
>>>>>>> > >>>> representation
>>>>>>> > >>>> > than
>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it would
>>>>>>> be
>>>>>>> > easier
>>>>>>> > >>>> to
>>>>>>> > >>>> > know
>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > 3) It would be better to list which connectors will be
>>>>>>> supported
>>>>>>> > in
>>>>>>> > >>>> the
>>>>>>> > >>>> > > plan?
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > Best,
>>>>>>> > >>>> > > Jark
>>>>>>> > >>>> > >
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>> > [hidden email]>
>>>>>>> > >>>> > wrote:
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > > Hi all,
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink Planner.
>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully done
>>>>>>> at Flink
>>>>>>> > >>>> layer.
>>>>>>> > >>>> > > > With the developing of storage, many downstream storage
>>>>>>> of Flink
>>>>>>> > >>>> SQL
>>>>>>> > >>>> > has
>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will improve
>>>>>>> > >>>> performance
>>>>>>> > >>>> > from
>>>>>>> > >>>> > > > the perspective of the network IO and computation
>>>>>>> overhead.
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > >
>>>>>>> > >>>> >
>>>>>>> > >>>>
>>>>>>> >
>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > > > --
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > > > *With kind regards
>>>>>>> > >>>> > > >
>>>>>>> ------------------------------------------------------------
>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese Academy of
>>>>>>> Science
>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>> > >>>> > > > E-mail: [hidden email] <[hidden email]>
>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > >
>>>>>>> > >>>> >
>>>>>>> > >>>> >
>>>>>>> > >>>> > --
>>>>>>> > >>>> >
>>>>>>> > >>>> > *With kind regards
>>>>>>> > >>>> > ------------------------------------------------------------
>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>> Science
>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>> > >>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>> > >>>> > QQ: 3239559*
>>>>>>> > >>>> >
>>>>>>> > >>>>
>>>>>>> > >>>
>>>>>>> > >>>
>>>>>>> > >>> --
>>>>>>> > >>>
>>>>>>> > >>> *With kind regards
>>>>>>> > >>> ------------------------------------------------------------
>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>>>>> > >>> QQ: 3239559*
>>>>>>> > >>>
>>>>>>> > >>>
>>>>>>> >
>>>>>>> > --
>>>>>>> >
>>>>>>> > *With kind regards
>>>>>>> > ------------------------------------------------------------
>>>>>>> > Sebastian Liu 刘洋
>>>>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>> > QQ: 3239559*
>>>>>>> >
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *With kind regards
>>>>>> ------------------------------------------------------------
>>>>>> Sebastian Liu 刘洋
>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>> Mobile\WeChat: +86—15201613655
>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>> QQ: 3239559*
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Sebastian Liu
Hi Jingsong, Jark,

Thx so much for our discussion, and the cases mentioned above are really
worthy for further discussion.

1. For aggregate with filter expressions: eg: select COUNT(1) FILTER(WHERE
cc_call_center_sk > 3) from call_center;
For the current Blink Planner, the optimized plan will be:
TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> LocalAgg ->
Exchange -> FinalAgg.
As there is a Calc above the TableSource, this pattern can't match the
LocalAggPushDownRule in the current design.

2. For the grouping set or rollup use case: eg: select COUNT(1) from
call_center group by rollup(cc_class, cc_employees);
For the current Blink Planner, the optimized plan will be:
TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> Calc.
It's also not covered by the current LocalAggPushDownRule design.

3. I want to add a case which we haven't discussed yet. Aggregate with
Having clause.
eg: select COUNT(1) from call_center group by cc_class having
max(cc_tax_percentage) > 0.2;
For the current Blink Planner, the optimized plan will be:
TableSourceScan -> LocalAgg -> Exchange -> FinalAgg -> Calc(where=[>($f2,
0.2:DECIMAL(2, 1))]).

The core discussion points are summarized as follows:
a) Aggregate is a more complex scenario than predicates or limits, and also
depends on the different underlying storages.
b) One rule seems can't completely cover all aggregate scenario, but
whether SupportSAggregatePushDown interface can be a bit more general?
c) Could the CallExpression express the semantics of CalCite AggregateCall?

IMO: Completely push down aggregate is generally hard for distributed
systems. Usually we need a GROUP BY and exactly
matches the partition mode in downstream storage. At the same time, the
benefit of remove the final aggregate is actually limited.
The LocalAggPushDown generally yields more than 80% of the CPU and IO
benefits. But I also agree that
the SupportsAggregatePushDown interface should be as generic as possible
for future extensions, and meanwhile keep confidence
in the interface we design.

For core points (a): As the complexity of aggregate, one LogicalAggregate
node may extend to "Expand / Calc / LocalXXAgg / Exchange / FinalXXAgg"
in physical phase. Seems that we can't solve all cases with only one
rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
on the pattern of TableSourceScan + LocalXXAggregate at present.

For core points (b & c): I think we can change the interface to be:
```

boolean applyAggregates(int[] groupingFields, List<CallExpression>
aggregateExpressions, DataType producedDataType, List<int[]> groupingSets);

```


Simple Group: groupingSets.size() == 1 &&
groupingSets.get(0).equals(groupingFields)

Cube Group: groupingSets.size() == IntMath.pow(2,
groupingFields.cardinality())
Rollup: Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup

Then we can handle the complex grouping case. The Connector developer of
the downstream storage should determine
whether it supports the associated grouping type. For the filter and having
clause, they will convert to be related Calc RelNode,
and no longer in the LocalAggregate node, the CallExpression may be
sufficient to express the semantics of AggregateCall.

What do you think? Looking forward to our further discussion.


Jingsong Li <[hidden email]> 于2021年1月6日周三 下午2:24写道:

> > I think filter expressions and grouping sets are semantic arguments
> instead of utilities. If we want to push them into sources, the connector
> developers should be aware of them.Wrapping them in a context implicitly is
> error-prone that the existing connector will produce wrong results when
> upgrading to new Flink versions.
>
> We can have some mechanism to check the upgrading.
>
> > I think for these cases, providing a new default method to override
> might be a better choice.
>
> Then we will have three or more methods. For the API level, I really don't
> like it...
>
> Best,
> Jingsong
>
> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <[hidden email]> wrote:
>
>> I think filter expressions and grouping sets are semantic arguments
>> instead of utilities.
>> If we want to push them into sources, the connector developers should be
>> aware of them.
>> Wrapping them in a context implicitly is error-prone that the existing
>> connector will produce wrong results
>>  when upgrading to new Flink versions (as we are pushing
>> grouping_sets/filter_args, but connector ignores it).
>> I think for these cases, providing a new default method to override might
>> be a better choice.
>>
>> Best,
>> Jark
>>
>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <[hidden email]> wrote:
>>
>>> Hi,
>>>
>>> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d >
>>> 1)). Can we push it down? I'm not sure that a single call expression can
>>> express it, and how we should embody it and convey it to users.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <[hidden email]>
>>> wrote:
>>>
>>>> Hi Jark,
>>>>
>>>> I don't want to limit this interface to LocalAgg Push down. Actually,
>>>> sometimes, we can push whole aggregation to source too.
>>>>
>>>> So, this rule can do something more advanced. For example, we can push
>>>> down group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1,
>>>> f2)". Then, we need to add more information to push down.
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]> wrote:
>>>>
>>>>> I think this may be over designed. We should have confidence in the
>>>>> interface we design, the interface should be stable.
>>>>> Wrapping things in a big context has a cost of losing user
>>>>> convenience.
>>>>> Foremost, we don't see any parameters to add in the future. Do you
>>>>> know any potential parameters?
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi Sebastian,
>>>>>>
>>>>>> Well, I mean:
>>>>>>
>>>>>> `boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>>> aggregateExpressions, DataType producedDataType);`
>>>>>> VS
>>>>>> ```
>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>
>>>>>> interface Aggregation {
>>>>>>   int[] groupingFields();
>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>   DataType producedDataType();
>>>>>> }
>>>>>> ```
>>>>>>
>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>> complicated thing. Maybe we need to extend its parameters in the future, so
>>>>>> make the parameters interface, which is conducive to the future expansion
>>>>>> without destroying the compatibility of user implementation. If it is the
>>>>>> way before, users need to modify the code.
>>>>>>
>>>>>> Best,
>>>>>> Jingsong
>>>>>>
>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Jinsong,
>>>>>>>
>>>>>>> Thx a lot for your suggestion. These points really need to be clear
>>>>>>> in the proposal.
>>>>>>>
>>>>>>> For the semantic problem, I think the main point is the different
>>>>>>> returned data types
>>>>>>> for the target aggregate function and the row format returned by the
>>>>>>> underlying storage.
>>>>>>> That's why we provide the producedDataType in the
>>>>>>> SupportsAggregatePushDown interface.
>>>>>>> Need to let developers know that we need to handle the semantic
>>>>>>> differences between
>>>>>>> the underlying storage system and Flink in related connectors.
>>>>>>> [Supplemented in proposal]
>>>>>>>
>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule,
>>>>>>> it's also a key point.
>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE rule
>>>>>>> set, and better to put it
>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>
>>>>>>> For the scalability of the interface, actually I don't exactly
>>>>>>> understand your suggestion. Is it to add
>>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>>> interface, and holds the
>>>>>>> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
>>>>>>> DataType producedDataType`
>>>>>>> fields?
>>>>>>>
>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>
>>>>>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>>>>>
>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>
>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful discussion
>>>>>>>> has solved
>>>>>>>> many of my concerns.
>>>>>>>>
>>>>>>>> ## Semantic problems
>>>>>>>>
>>>>>>>> We may need to add some mechanisms or comments, because as far as I
>>>>>>>> know,
>>>>>>>> the semantics of each database is actually different, which may
>>>>>>>> need to be
>>>>>>>> reflected in your specific implementation.
>>>>>>>>
>>>>>>>> For example, the AVG output types of various databases may be
>>>>>>>> different.
>>>>>>>> For example, MySQL outputs double, this is different from Flink.
>>>>>>>> What
>>>>>>>> should we do? (Lucky, avg will be splitted into sum and count, But
>>>>>>>> we also
>>>>>>>> need care about decimal and others)
>>>>>>>>
>>>>>>>> ## The phase of push-down rule
>>>>>>>>
>>>>>>>> I strongly recommend that you do not put it in the Volcano phase,
>>>>>>>> which may
>>>>>>>> make the cost calculation very troublesome.
>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>
>>>>>>>> ## About interface
>>>>>>>>
>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>> `Aggregate`
>>>>>>>> interface, it contains `List<CallExpression> aggregateExpressions,
>>>>>>>> int[]
>>>>>>>> groupingFields, DataType producedDataType` fields. In this way, we
>>>>>>>> can add
>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>
>>>>>>>> I think the current design is very good, just put forward some
>>>>>>>> ideas.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jingsong
>>>>>>>>
>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <[hidden email]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> > Hi Jark,
>>>>>>>> >
>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>>> >
>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>>> TableSource can
>>>>>>>> > handle all of the aggregates.
>>>>>>>> > It's better to just return a boolean type to indicate whether all
>>>>>>>> of
>>>>>>>> > aggregates push down was successful or not. [Resolved in proposal]
>>>>>>>> >
>>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced data
>>>>>>>> type of
>>>>>>>> > the new table source to make sure that the new table source can
>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's return
>>>>>>>> type.
>>>>>>>> > The reason for adding this parameter in this function is also to
>>>>>>>> facilitate
>>>>>>>> > the user to build the final output type. I have changed this
>>>>>>>> parameter
>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>> >
>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have changed
>>>>>>>> to use
>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>> >
>>>>>>>> > Thx again for the suggestion, looking for the further discussion.
>>>>>>>> >
>>>>>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>>>>>> >
>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>> > >
>>>>>>>> > > Regarding to the updated interface,
>>>>>>>> > >
>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>> aggregateExpressions,
>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>> > >
>>>>>>>> > > final class Result {
>>>>>>>> > >        private final List<CallExpression> acceptedAggregates;
>>>>>>>> > >        private final List<CallExpression> remainingAggregates;
>>>>>>>> > > }
>>>>>>>> > >
>>>>>>>> > > I have following comments:
>>>>>>>> > >
>>>>>>>> > > 1) Do we need the composite Result return type? Is a boolean
>>>>>>>> return type
>>>>>>>> > > enough?
>>>>>>>> > >     From my understanding, all of the aggregates should be
>>>>>>>> accepted,
>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>> > >     Therefore, users don't need to distinguish which aggregates
>>>>>>>> are
>>>>>>>> > > "accepted".
>>>>>>>> > >
>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced data
>>>>>>>> type of the
>>>>>>>> > > new source, or just the return type of all the agg functions?
>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users to
>>>>>>>> concat a
>>>>>>>> > final
>>>>>>>> > > output type.
>>>>>>>> > >     The return type of each agg function can be obtained from
>>>>>>>> the
>>>>>>>> > > `CallExpression`.
>>>>>>>> > >
>>>>>>>> > > 3) What do you think about renaming `groupSet` to `grouping` or
>>>>>>>> > > `groupedFields` ?
>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>> "grouping sets".
>>>>>>>> > >
>>>>>>>> > >
>>>>>>>> > > What do you think?
>>>>>>>> > >
>>>>>>>> > > Best,
>>>>>>>> > > Jark
>>>>>>>> > >
>>>>>>>> > >
>>>>>>>> > >
>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]>
>>>>>>>> wrote:
>>>>>>>> > >
>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>> > >> I meant idea #2.
>>>>>>>> > >>
>>>>>>>> > >> Best,
>>>>>>>> > >> Kurt
>>>>>>>> > >>
>>>>>>>> > >>
>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>> [hidden email]>
>>>>>>>> > >> wrote:
>>>>>>>> > >>
>>>>>>>> > >>> Hi Kurt,
>>>>>>>> > >>>
>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is more
>>>>>>>> like a
>>>>>>>> > >>> physical operator rather than logical
>>>>>>>> > >>> operator, I think your suggestion should be idea #2 which
>>>>>>>> handle all in
>>>>>>>> > >>> the physical optimization phase?
>>>>>>>> > >>>
>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>> > >>>
>>>>>>>> > >>>
>>>>>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>>>>>> > >>>
>>>>>>>> > >>>> Local aggregation is more like a physical operator rather
>>>>>>>> than logical
>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>> > >>>>
>>>>>>>> > >>>> Best,
>>>>>>>> > >>>> Kurt
>>>>>>>> > >>>>
>>>>>>>> > >>>>
>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>> [hidden email]>
>>>>>>>> > >>>> wrote:
>>>>>>>> > >>>>
>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>>>> suggestions.
>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of upgrading
>>>>>>>> the new
>>>>>>>> > table
>>>>>>>> > >>>> > source api,
>>>>>>>> > >>>> > we really should consider the new interface for the new
>>>>>>>> optimize
>>>>>>>> > >>>> rule. If
>>>>>>>> > >>>> > the new rule
>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it sooner
>>>>>>>> or later. I
>>>>>>>> > >>>> have
>>>>>>>> > >>>> > change to use
>>>>>>>> > >>>> > the ability interface for the SupportsAggregatePushDown
>>>>>>>> definition
>>>>>>>> > in
>>>>>>>> > >>>> above
>>>>>>>> > >>>> > proposal.
>>>>>>>> > >>>> >
>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a better
>>>>>>>> choice, and
>>>>>>>> > >>>> have
>>>>>>>> > >>>> > resolved this
>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>> > >>>> >
>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC connector, as
>>>>>>>> we don't
>>>>>>>> > >>>> have
>>>>>>>> > >>>> > Druid connector
>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>> > >>>> >
>>>>>>>> > >>>> > But perhaps the biggest question may be whether we should
>>>>>>>> use idea 1
>>>>>>>> > >>>> or
>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>> > >>>> >
>>>>>>>> > >>>> > What do you think?  After we reach the agreement on the
>>>>>>>> proposal,
>>>>>>>> > our
>>>>>>>> > >>>> team
>>>>>>>> > >>>> > can drive to
>>>>>>>> > >>>> > complete this feature.
>>>>>>>> > >>>> >
>>>>>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>>>>>> > >>>> >
>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>> > >>>> > >
>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>> improvement for
>>>>>>>> > >>>> Flink
>>>>>>>> > >>>> > SQL.
>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>> thoughts:
>>>>>>>> > >>>> > >
>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11
>>>>>>>> and
>>>>>>>> > proposed
>>>>>>>> > >>>> a new
>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you update
>>>>>>>> your
>>>>>>>> > >>>> proposal to
>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>>>> > >>>> > >
>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a better
>>>>>>>> > >>>> representation
>>>>>>>> > >>>> > than
>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it
>>>>>>>> would be
>>>>>>>> > easier
>>>>>>>> > >>>> to
>>>>>>>> > >>>> > know
>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>> > >>>> > >
>>>>>>>> > >>>> > > 3) It would be better to list which connectors will be
>>>>>>>> supported
>>>>>>>> > in
>>>>>>>> > >>>> the
>>>>>>>> > >>>> > > plan?
>>>>>>>> > >>>> > >
>>>>>>>> > >>>> > > Best,
>>>>>>>> > >>>> > > Jark
>>>>>>>> > >>>> > >
>>>>>>>> > >>>> > >
>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>> > [hidden email]>
>>>>>>>> > >>>> > wrote:
>>>>>>>> > >>>> > >
>>>>>>>> > >>>> > > > Hi all,
>>>>>>>> > >>>> > > >
>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink
>>>>>>>> Planner.
>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully
>>>>>>>> done at Flink
>>>>>>>> > >>>> layer.
>>>>>>>> > >>>> > > > With the developing of storage, many downstream
>>>>>>>> storage of Flink
>>>>>>>> > >>>> SQL
>>>>>>>> > >>>> > has
>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will
>>>>>>>> improve
>>>>>>>> > >>>> performance
>>>>>>>> > >>>> > from
>>>>>>>> > >>>> > > > the perspective of the network IO and computation
>>>>>>>> overhead.
>>>>>>>> > >>>> > > >
>>>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>>>> > >>>> > > >
>>>>>>>> > >>>> > > >
>>>>>>>> > >>>> > >
>>>>>>>> > >>>> >
>>>>>>>> > >>>>
>>>>>>>> >
>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>> > >>>> > > >
>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>> > >>>> > > >
>>>>>>>> > >>>> > > > --
>>>>>>>> > >>>> > > >
>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>> > >>>> > > >
>>>>>>>> ------------------------------------------------------------
>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese Academy of
>>>>>>>> Science
>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>> > >>>> > > > E-mail: [hidden email] <[hidden email]>
>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>> > >>>> > > >
>>>>>>>> > >>>> > >
>>>>>>>> > >>>> >
>>>>>>>> > >>>> >
>>>>>>>> > >>>> > --
>>>>>>>> > >>>> >
>>>>>>>> > >>>> > *With kind regards
>>>>>>>> > >>>> >
>>>>>>>> ------------------------------------------------------------
>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>> Science
>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>> > >>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>> > >>>> >
>>>>>>>> > >>>>
>>>>>>>> > >>>
>>>>>>>> > >>>
>>>>>>>> > >>> --
>>>>>>>> > >>>
>>>>>>>> > >>> *With kind regards
>>>>>>>> > >>> ------------------------------------------------------------
>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>>>>>> > >>> QQ: 3239559*
>>>>>>>> > >>>
>>>>>>>> > >>>
>>>>>>>> >
>>>>>>>> > --
>>>>>>>> >
>>>>>>>> > *With kind regards
>>>>>>>> > ------------------------------------------------------------
>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>> > QQ: 3239559*
>>>>>>>> >
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best, Jingsong Lee
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> *With kind regards
>>>>>>> ------------------------------------------------------------
>>>>>>> Sebastian Liu 刘洋
>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>>> QQ: 3239559*
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>


--

*With kind regards
------------------------------------------------------------
Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: [hidden email] <[hidden email]>
QQ: 3239559*
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jark Wu-2
Hi Liu, Jingsong,

Regarding the agg with filter, I think in theory we can support pushing
such a pattern into source.
We don't need to support it in the first version, but in the long term, we
can support it.
The designed interface should be future proof.

Considering filter arg and distinct flag should be part of the aggregate
expression.
I'm wondering if CallExpression is a good representation for it.
What do you think about proposing the following `AggregateExpression` to
replace the `CallExpression`?

class AggregateExpression implements ResolvedExpression {
    private final FunctionDefinition functionDefinition;
    private final List<FieldReferenceExpression> args;
    private final @Nullable CallExpression filterExpr;
    private final boolean distinct;
}

Besides, we don't need both groupingFields and groupingSets.
`groupingSets` should be a superset of groupingFields.
Then the interface of SupportsAggregatePushDown can be:

interface SupportsAggregatePushDown {

  boolean applyAggregates(
    List<int[]> groupingSets,
    List<AggregateExpression> aggregates,
    DataType producedDataType);
}

What do you think?

Best,
Jark

On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <[hidden email]> wrote:

> Hi Jingsong, Jark,
>
> Thx so much for our discussion, and the cases mentioned above are really
> worthy for further discussion.
>
> 1. For aggregate with filter expressions: eg: select COUNT(1) FILTER(WHERE
> cc_call_center_sk > 3) from call_center;
> For the current Blink Planner, the optimized plan will be:
> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> LocalAgg ->
> Exchange -> FinalAgg.
> As there is a Calc above the TableSource, this pattern can't match the
> LocalAggPushDownRule in the current design.
>
> 2. For the grouping set or rollup use case: eg: select COUNT(1) from
> call_center group by rollup(cc_class, cc_employees);
> For the current Blink Planner, the optimized plan will be:
> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> Calc.
> It's also not covered by the current LocalAggPushDownRule design.
>
> 3. I want to add a case which we haven't discussed yet. Aggregate with
> Having clause.
> eg: select COUNT(1) from call_center group by cc_class having
> max(cc_tax_percentage) > 0.2;
> For the current Blink Planner, the optimized plan will be:
> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg -> Calc(where=[>($f2,
> 0.2:DECIMAL(2, 1))]).
>
> The core discussion points are summarized as follows:
> a) Aggregate is a more complex scenario than predicates or limits, and
> also depends on the different underlying storages.
> b) One rule seems can't completely cover all aggregate scenario, but
> whether SupportSAggregatePushDown interface can be a bit more general?
> c) Could the CallExpression express the semantics of CalCite
> AggregateCall?
>
> IMO: Completely push down aggregate is generally hard for distributed
> systems. Usually we need a GROUP BY and exactly
> matches the partition mode in downstream storage. At the same time, the
> benefit of remove the final aggregate is actually limited.
> The LocalAggPushDown generally yields more than 80% of the CPU and IO
> benefits. But I also agree that
> the SupportsAggregatePushDown interface should be as generic as possible
> for future extensions, and meanwhile keep confidence
> in the interface we design.
>
> For core points (a): As the complexity of aggregate, one LogicalAggregate
> node may extend to "Expand / Calc / LocalXXAgg / Exchange / FinalXXAgg"
> in physical phase. Seems that we can't solve all cases with only one
> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
> on the pattern of TableSourceScan + LocalXXAggregate at present.
>
> For core points (b & c): I think we can change the interface to be:
> ```
>
> boolean applyAggregates(int[] groupingFields, List<CallExpression>
> aggregateExpressions, DataType producedDataType, List<int[]> groupingSets)
> ;
>
> ```
>
>
> Simple Group: groupingSets.size() == 1 &&
> groupingSets.get(0).equals(groupingFields)
>
> Cube Group: groupingSets.size() == IntMath.pow(2,
> groupingFields.cardinality())
> Rollup: Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup
>
> Then we can handle the complex grouping case. The Connector developer of
> the downstream storage should determine
> whether it supports the associated grouping type. For the filter and
> having clause, they will convert to be related Calc RelNode,
> and no longer in the LocalAggregate node, the CallExpression may be
> sufficient to express the semantics of AggregateCall.
>
> What do you think? Looking forward to our further discussion.
>
>
> Jingsong Li <[hidden email]> 于2021年1月6日周三 下午2:24写道:
>
>> > I think filter expressions and grouping sets are semantic arguments
>> instead of utilities. If we want to push them into sources, the connector
>> developers should be aware of them.Wrapping them in a context implicitly is
>> error-prone that the existing connector will produce wrong results when
>> upgrading to new Flink versions.
>>
>> We can have some mechanism to check the upgrading.
>>
>> > I think for these cases, providing a new default method to override
>> might be a better choice.
>>
>> Then we will have three or more methods. For the API level, I really
>> don't like it...
>>
>> Best,
>> Jingsong
>>
>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <[hidden email]> wrote:
>>
>>> I think filter expressions and grouping sets are semantic arguments
>>> instead of utilities.
>>> If we want to push them into sources, the connector developers should be
>>> aware of them.
>>> Wrapping them in a context implicitly is error-prone that the existing
>>> connector will produce wrong results
>>>  when upgrading to new Flink versions (as we are pushing
>>> grouping_sets/filter_args, but connector ignores it).
>>> I think for these cases, providing a new default method to override
>>> might be a better choice.
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <[hidden email]> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d >
>>>> 1)). Can we push it down? I'm not sure that a single call expression can
>>>> express it, and how we should embody it and convey it to users.
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi Jark,
>>>>>
>>>>> I don't want to limit this interface to LocalAgg Push down. Actually,
>>>>> sometimes, we can push whole aggregation to source too.
>>>>>
>>>>> So, this rule can do something more advanced. For example, we can push
>>>>> down group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1,
>>>>> f2)". Then, we need to add more information to push down.
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]> wrote:
>>>>>
>>>>>> I think this may be over designed. We should have confidence in the
>>>>>> interface we design, the interface should be stable.
>>>>>> Wrapping things in a big context has a cost of losing user
>>>>>> convenience.
>>>>>> Foremost, we don't see any parameters to add in the future. Do you
>>>>>> know any potential parameters?
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Sebastian,
>>>>>>>
>>>>>>> Well, I mean:
>>>>>>>
>>>>>>> `boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>>>> aggregateExpressions, DataType producedDataType);`
>>>>>>> VS
>>>>>>> ```
>>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>>
>>>>>>> interface Aggregation {
>>>>>>>   int[] groupingFields();
>>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>>   DataType producedDataType();
>>>>>>> }
>>>>>>> ```
>>>>>>>
>>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>>> complicated thing. Maybe we need to extend its parameters in the future, so
>>>>>>> make the parameters interface, which is conducive to the future expansion
>>>>>>> without destroying the compatibility of user implementation. If it is the
>>>>>>> way before, users need to modify the code.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong
>>>>>>>
>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Jinsong,
>>>>>>>>
>>>>>>>> Thx a lot for your suggestion. These points really need to be clear
>>>>>>>> in the proposal.
>>>>>>>>
>>>>>>>> For the semantic problem, I think the main point is the different
>>>>>>>> returned data types
>>>>>>>> for the target aggregate function and the row format returned by
>>>>>>>> the underlying storage.
>>>>>>>> That's why we provide the producedDataType in the
>>>>>>>> SupportsAggregatePushDown interface.
>>>>>>>> Need to let developers know that we need to handle the semantic
>>>>>>>> differences between
>>>>>>>> the underlying storage system and Flink in related connectors.
>>>>>>>> [Supplemented in proposal]
>>>>>>>>
>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule,
>>>>>>>> it's also a key point.
>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE rule
>>>>>>>> set, and better to put it
>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>>
>>>>>>>> For the scalability of the interface, actually I don't exactly
>>>>>>>> understand your suggestion. Is it to add
>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>>>> interface, and holds the
>>>>>>>> `List < CallExpression > aggregateExpressions, int[]
>>>>>>>> GroupingFields, DataType producedDataType`
>>>>>>>> fields?
>>>>>>>>
>>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>>
>>>>>>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>>>>>>
>>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>>
>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful discussion
>>>>>>>>> has solved
>>>>>>>>> many of my concerns.
>>>>>>>>>
>>>>>>>>> ## Semantic problems
>>>>>>>>>
>>>>>>>>> We may need to add some mechanisms or comments, because as far as
>>>>>>>>> I know,
>>>>>>>>> the semantics of each database is actually different, which may
>>>>>>>>> need to be
>>>>>>>>> reflected in your specific implementation.
>>>>>>>>>
>>>>>>>>> For example, the AVG output types of various databases may be
>>>>>>>>> different.
>>>>>>>>> For example, MySQL outputs double, this is different from Flink.
>>>>>>>>> What
>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and count, But
>>>>>>>>> we also
>>>>>>>>> need care about decimal and others)
>>>>>>>>>
>>>>>>>>> ## The phase of push-down rule
>>>>>>>>>
>>>>>>>>> I strongly recommend that you do not put it in the Volcano phase,
>>>>>>>>> which may
>>>>>>>>> make the cost calculation very troublesome.
>>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>>
>>>>>>>>> ## About interface
>>>>>>>>>
>>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>>> `Aggregate`
>>>>>>>>> interface, it contains `List<CallExpression> aggregateExpressions,
>>>>>>>>> int[]
>>>>>>>>> groupingFields, DataType producedDataType` fields. In this way, we
>>>>>>>>> can add
>>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>>
>>>>>>>>> I think the current design is very good, just put forward some
>>>>>>>>> ideas.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jingsong
>>>>>>>>>
>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <
>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> > Hi Jark,
>>>>>>>>> >
>>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>>>> >
>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>>>> TableSource can
>>>>>>>>> > handle all of the aggregates.
>>>>>>>>> > It's better to just return a boolean type to indicate whether
>>>>>>>>> all of
>>>>>>>>> > aggregates push down was successful or not. [Resolved in
>>>>>>>>> proposal]
>>>>>>>>> >
>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced data
>>>>>>>>> type of
>>>>>>>>> > the new table source to make sure that the new table source can
>>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's
>>>>>>>>> return type.
>>>>>>>>> > The reason for adding this parameter in this function is also to
>>>>>>>>> facilitate
>>>>>>>>> > the user to build the final output type. I have changed this
>>>>>>>>> parameter
>>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>>> >
>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have
>>>>>>>>> changed to use
>>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>>> >
>>>>>>>>> > Thx again for the suggestion, looking for the further discussion.
>>>>>>>>> >
>>>>>>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>>>>>>> >
>>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>>> > >
>>>>>>>>> > > Regarding to the updated interface,
>>>>>>>>> > >
>>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>>> aggregateExpressions,
>>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>>> > >
>>>>>>>>> > > final class Result {
>>>>>>>>> > >        private final List<CallExpression> acceptedAggregates;
>>>>>>>>> > >        private final List<CallExpression> remainingAggregates;
>>>>>>>>> > > }
>>>>>>>>> > >
>>>>>>>>> > > I have following comments:
>>>>>>>>> > >
>>>>>>>>> > > 1) Do we need the composite Result return type? Is a boolean
>>>>>>>>> return type
>>>>>>>>> > > enough?
>>>>>>>>> > >     From my understanding, all of the aggregates should be
>>>>>>>>> accepted,
>>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>>> > >     Therefore, users don't need to distinguish which
>>>>>>>>> aggregates are
>>>>>>>>> > > "accepted".
>>>>>>>>> > >
>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced data
>>>>>>>>> type of the
>>>>>>>>> > > new source, or just the return type of all the agg functions?
>>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users to
>>>>>>>>> concat a
>>>>>>>>> > final
>>>>>>>>> > > output type.
>>>>>>>>> > >     The return type of each agg function can be obtained from
>>>>>>>>> the
>>>>>>>>> > > `CallExpression`.
>>>>>>>>> > >
>>>>>>>>> > > 3) What do you think about renaming `groupSet` to `grouping` or
>>>>>>>>> > > `groupedFields` ?
>>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>>> "grouping sets".
>>>>>>>>> > >
>>>>>>>>> > >
>>>>>>>>> > > What do you think?
>>>>>>>>> > >
>>>>>>>>> > > Best,
>>>>>>>>> > > Jark
>>>>>>>>> > >
>>>>>>>>> > >
>>>>>>>>> > >
>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]>
>>>>>>>>> wrote:
>>>>>>>>> > >
>>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>>> > >> I meant idea #2.
>>>>>>>>> > >>
>>>>>>>>> > >> Best,
>>>>>>>>> > >> Kurt
>>>>>>>>> > >>
>>>>>>>>> > >>
>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>>> [hidden email]>
>>>>>>>>> > >> wrote:
>>>>>>>>> > >>
>>>>>>>>> > >>> Hi Kurt,
>>>>>>>>> > >>>
>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is more
>>>>>>>>> like a
>>>>>>>>> > >>> physical operator rather than logical
>>>>>>>>> > >>> operator, I think your suggestion should be idea #2 which
>>>>>>>>> handle all in
>>>>>>>>> > >>> the physical optimization phase?
>>>>>>>>> > >>>
>>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>>> > >>>
>>>>>>>>> > >>>
>>>>>>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>>>>>>> > >>>
>>>>>>>>> > >>>> Local aggregation is more like a physical operator rather
>>>>>>>>> than logical
>>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>>> > >>>>
>>>>>>>>> > >>>> Best,
>>>>>>>>> > >>>> Kurt
>>>>>>>>> > >>>>
>>>>>>>>> > >>>>
>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>>> [hidden email]>
>>>>>>>>> > >>>> wrote:
>>>>>>>>> > >>>>
>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>>>>> suggestions.
>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of upgrading
>>>>>>>>> the new
>>>>>>>>> > table
>>>>>>>>> > >>>> > source api,
>>>>>>>>> > >>>> > we really should consider the new interface for the new
>>>>>>>>> optimize
>>>>>>>>> > >>>> rule. If
>>>>>>>>> > >>>> > the new rule
>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it sooner
>>>>>>>>> or later. I
>>>>>>>>> > >>>> have
>>>>>>>>> > >>>> > change to use
>>>>>>>>> > >>>> > the ability interface for the SupportsAggregatePushDown
>>>>>>>>> definition
>>>>>>>>> > in
>>>>>>>>> > >>>> above
>>>>>>>>> > >>>> > proposal.
>>>>>>>>> > >>>> >
>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a better
>>>>>>>>> choice, and
>>>>>>>>> > >>>> have
>>>>>>>>> > >>>> > resolved this
>>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>>> > >>>> >
>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC connector,
>>>>>>>>> as we don't
>>>>>>>>> > >>>> have
>>>>>>>>> > >>>> > Druid connector
>>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>>> > >>>> >
>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we should
>>>>>>>>> use idea 1
>>>>>>>>> > >>>> or
>>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>>> > >>>> >
>>>>>>>>> > >>>> > What do you think?  After we reach the agreement on the
>>>>>>>>> proposal,
>>>>>>>>> > our
>>>>>>>>> > >>>> team
>>>>>>>>> > >>>> > can drive to
>>>>>>>>> > >>>> > complete this feature.
>>>>>>>>> > >>>> >
>>>>>>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>>>>>>> > >>>> >
>>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>>> > >>>> > >
>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>>> improvement for
>>>>>>>>> > >>>> Flink
>>>>>>>>> > >>>> > SQL.
>>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>>> thoughts:
>>>>>>>>> > >>>> > >
>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11
>>>>>>>>> and
>>>>>>>>> > proposed
>>>>>>>>> > >>>> a new
>>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you update
>>>>>>>>> your
>>>>>>>>> > >>>> proposal to
>>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>>>>> > >>>> > >
>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a better
>>>>>>>>> > >>>> representation
>>>>>>>>> > >>>> > than
>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it
>>>>>>>>> would be
>>>>>>>>> > easier
>>>>>>>>> > >>>> to
>>>>>>>>> > >>>> > know
>>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>>> > >>>> > >
>>>>>>>>> > >>>> > > 3) It would be better to list which connectors will be
>>>>>>>>> supported
>>>>>>>>> > in
>>>>>>>>> > >>>> the
>>>>>>>>> > >>>> > > plan?
>>>>>>>>> > >>>> > >
>>>>>>>>> > >>>> > > Best,
>>>>>>>>> > >>>> > > Jark
>>>>>>>>> > >>>> > >
>>>>>>>>> > >>>> > >
>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>>> > [hidden email]>
>>>>>>>>> > >>>> > wrote:
>>>>>>>>> > >>>> > >
>>>>>>>>> > >>>> > > > Hi all,
>>>>>>>>> > >>>> > > >
>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink
>>>>>>>>> Planner.
>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully
>>>>>>>>> done at Flink
>>>>>>>>> > >>>> layer.
>>>>>>>>> > >>>> > > > With the developing of storage, many downstream
>>>>>>>>> storage of Flink
>>>>>>>>> > >>>> SQL
>>>>>>>>> > >>>> > has
>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will
>>>>>>>>> improve
>>>>>>>>> > >>>> performance
>>>>>>>>> > >>>> > from
>>>>>>>>> > >>>> > > > the perspective of the network IO and computation
>>>>>>>>> overhead.
>>>>>>>>> > >>>> > > >
>>>>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>>>>> > >>>> > > >
>>>>>>>>> > >>>> > > >
>>>>>>>>> > >>>> > >
>>>>>>>>> > >>>> >
>>>>>>>>> > >>>>
>>>>>>>>> >
>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>>> > >>>> > > >
>>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>>> > >>>> > > >
>>>>>>>>> > >>>> > > > --
>>>>>>>>> > >>>> > > >
>>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>>> > >>>> > > >
>>>>>>>>> ------------------------------------------------------------
>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese Academy of
>>>>>>>>> Science
>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>>> > >>>> > > > E-mail: [hidden email] <[hidden email]>
>>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>>> > >>>> > > >
>>>>>>>>> > >>>> > >
>>>>>>>>> > >>>> >
>>>>>>>>> > >>>> >
>>>>>>>>> > >>>> > --
>>>>>>>>> > >>>> >
>>>>>>>>> > >>>> > *With kind regards
>>>>>>>>> > >>>> >
>>>>>>>>> ------------------------------------------------------------
>>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>>> Science
>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>> > >>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>>> > >>>> >
>>>>>>>>> > >>>>
>>>>>>>>> > >>>
>>>>>>>>> > >>>
>>>>>>>>> > >>> --
>>>>>>>>> > >>>
>>>>>>>>> > >>> *With kind regards
>>>>>>>>> > >>> ------------------------------------------------------------
>>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>> > >>> QQ: 3239559*
>>>>>>>>> > >>>
>>>>>>>>> > >>>
>>>>>>>>> >
>>>>>>>>> > --
>>>>>>>>> >
>>>>>>>>> > *With kind regards
>>>>>>>>> > ------------------------------------------------------------
>>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>> > QQ: 3239559*
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> *With kind regards
>>>>>>>> ------------------------------------------------------------
>>>>>>>> Sebastian Liu 刘洋
>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>>>> QQ: 3239559*
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
>
> *With kind regards
> ------------------------------------------------------------
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: [hidden email] <[hidden email]>
> QQ: 3239559*
>
>
12