Support local aggregate push down for Blink batch planner

classic Classic list List threaded Threaded
27 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Sebastian Liu
Hi Jark,

Sounds good to me. For better scalability in the future, we could add the
AggregateExpression.
```

public class AggregateExpression implements ResolvedExpression {

   private final FunctionDefinition functionDefinition;

   private final List<FieldReferenceExpression> args;

   private final @Nullable CallExpression filterExpression;

   private final DataType resultType;

   private final boolean distinct;

   private final boolean approximate;



   private final boolean ignoreNulls;

}
```

And we really only need one GroupingSets parameter for grouping. I have
updated the related interface in the proposal.
Appreciate the continued feedback and help.

Jark Wu <[hidden email]> 于2021年1月6日周三 下午9:34写道:

> Hi Liu, Jingsong,
>
> Regarding the agg with filter, I think in theory we can support pushing
> such a pattern into source.
> We don't need to support it in the first version, but in the long term, we
> can support it.
> The designed interface should be future proof.
>
> Considering filter arg and distinct flag should be part of the aggregate
> expression.
> I'm wondering if CallExpression is a good representation for it.
> What do you think about proposing the following `AggregateExpression` to
> replace the `CallExpression`?
>
> class AggregateExpression implements ResolvedExpression {
>     private final FunctionDefinition functionDefinition;
>     private final List<FieldReferenceExpression> args;
>     private final @Nullable CallExpression filterExpr;
>     private final boolean distinct;
> }
>
> Besides, we don't need both groupingFields and groupingSets.
> `groupingSets` should be a superset of groupingFields.
> Then the interface of SupportsAggregatePushDown can be:
>
> interface SupportsAggregatePushDown {
>
>   boolean applyAggregates(
>     List<int[]> groupingSets,
>     List<AggregateExpression> aggregates,
>     DataType producedDataType);
> }
>
> What do you think?
>
> Best,
> Jark
>
> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <[hidden email]> wrote:
>
>> Hi Jingsong, Jark,
>>
>> Thx so much for our discussion, and the cases mentioned above are really
>> worthy for further discussion.
>>
>> 1. For aggregate with filter expressions: eg: select COUNT(1)
>> FILTER(WHERE cc_call_center_sk > 3) from call_center;
>> For the current Blink Planner, the optimized plan will be:
>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> LocalAgg ->
>> Exchange -> FinalAgg.
>> As there is a Calc above the TableSource, this pattern can't match the
>> LocalAggPushDownRule in the current design.
>>
>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from
>> call_center group by rollup(cc_class, cc_employees);
>> For the current Blink Planner, the optimized plan will be:
>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> Calc.
>> It's also not covered by the current LocalAggPushDownRule design.
>>
>> 3. I want to add a case which we haven't discussed yet. Aggregate with
>> Having clause.
>> eg: select COUNT(1) from call_center group by cc_class having
>> max(cc_tax_percentage) > 0.2;
>> For the current Blink Planner, the optimized plan will be:
>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg -> Calc(where=[>($f2,
>> 0.2:DECIMAL(2, 1))]).
>>
>> The core discussion points are summarized as follows:
>> a) Aggregate is a more complex scenario than predicates or limits, and
>> also depends on the different underlying storages.
>> b) One rule seems can't completely cover all aggregate scenario, but
>> whether SupportSAggregatePushDown interface can be a bit more general?
>> c) Could the CallExpression express the semantics of CalCite
>> AggregateCall?
>>
>> IMO: Completely push down aggregate is generally hard for distributed
>> systems. Usually we need a GROUP BY and exactly
>> matches the partition mode in downstream storage. At the same time, the
>> benefit of remove the final aggregate is actually limited.
>> The LocalAggPushDown generally yields more than 80% of the CPU and IO
>> benefits. But I also agree that
>> the SupportsAggregatePushDown interface should be as generic as possible
>> for future extensions, and meanwhile keep confidence
>> in the interface we design.
>>
>> For core points (a): As the complexity of aggregate, one LogicalAggregate
>> node may extend to "Expand / Calc / LocalXXAgg / Exchange / FinalXXAgg"
>> in physical phase. Seems that we can't solve all cases with only one
>> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
>> on the pattern of TableSourceScan + LocalXXAggregate at present.
>>
>> For core points (b & c): I think we can change the interface to be:
>> ```
>>
>> boolean applyAggregates(int[] groupingFields, List<CallExpression>
>> aggregateExpressions, DataType producedDataType, List<int[]> groupingSets
>> );
>>
>> ```
>>
>>
>> Simple Group: groupingSets.size() == 1 &&
>> groupingSets.get(0).equals(groupingFields)
>>
>> Cube Group: groupingSets.size() == IntMath.pow(2,
>> groupingFields.cardinality())
>> Rollup: Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup
>>
>> Then we can handle the complex grouping case. The Connector developer of
>> the downstream storage should determine
>> whether it supports the associated grouping type. For the filter and
>> having clause, they will convert to be related Calc RelNode,
>> and no longer in the LocalAggregate node, the CallExpression may be
>> sufficient to express the semantics of AggregateCall.
>>
>> What do you think? Looking forward to our further discussion.
>>
>>
>> Jingsong Li <[hidden email]> 于2021年1月6日周三 下午2:24写道:
>>
>>> > I think filter expressions and grouping sets are semantic arguments
>>> instead of utilities. If we want to push them into sources, the connector
>>> developers should be aware of them.Wrapping them in a context implicitly is
>>> error-prone that the existing connector will produce wrong results when
>>> upgrading to new Flink versions.
>>>
>>> We can have some mechanism to check the upgrading.
>>>
>>> > I think for these cases, providing a new default method to override
>>> might be a better choice.
>>>
>>> Then we will have three or more methods. For the API level, I really
>>> don't like it...
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <[hidden email]> wrote:
>>>
>>>> I think filter expressions and grouping sets are semantic arguments
>>>> instead of utilities.
>>>> If we want to push them into sources, the connector developers should
>>>> be aware of them.
>>>> Wrapping them in a context implicitly is error-prone that the existing
>>>> connector will produce wrong results
>>>>  when upgrading to new Flink versions (as we are pushing
>>>> grouping_sets/filter_args, but connector ignores it).
>>>> I think for these cases, providing a new default method to override
>>>> might be a better choice.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d
>>>>> > 1)). Can we push it down? I'm not sure that a single call expression can
>>>>> express it, and how we should embody it and convey it to users.
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi Jark,
>>>>>>
>>>>>> I don't want to limit this interface to LocalAgg Push down. Actually,
>>>>>> sometimes, we can push whole aggregation to source too.
>>>>>>
>>>>>> So, this rule can do something more advanced. For example, we can
>>>>>> push down group sets to source too, for the SQL: "GROUP BY GROUPING SETS
>>>>>> (f1, f2)". Then, we need to add more information to push down.
>>>>>>
>>>>>> Best,
>>>>>> Jingsong
>>>>>>
>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]> wrote:
>>>>>>
>>>>>>> I think this may be over designed. We should have confidence in the
>>>>>>> interface we design, the interface should be stable.
>>>>>>> Wrapping things in a big context has a cost of losing user
>>>>>>> convenience.
>>>>>>> Foremost, we don't see any parameters to add in the future. Do you
>>>>>>> know any potential parameters?
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Sebastian,
>>>>>>>>
>>>>>>>> Well, I mean:
>>>>>>>>
>>>>>>>> `boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>>>>> aggregateExpressions, DataType producedDataType);`
>>>>>>>> VS
>>>>>>>> ```
>>>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>>>
>>>>>>>> interface Aggregation {
>>>>>>>>   int[] groupingFields();
>>>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>>>   DataType producedDataType();
>>>>>>>> }
>>>>>>>> ```
>>>>>>>>
>>>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>>>> complicated thing. Maybe we need to extend its parameters in the future, so
>>>>>>>> make the parameters interface, which is conducive to the future expansion
>>>>>>>> without destroying the compatibility of user implementation. If it is the
>>>>>>>> way before, users need to modify the code.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jingsong
>>>>>>>>
>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <
>>>>>>>> [hidden email]> wrote:
>>>>>>>>
>>>>>>>>> Hi Jinsong,
>>>>>>>>>
>>>>>>>>> Thx a lot for your suggestion. These points really need to be
>>>>>>>>> clear in the proposal.
>>>>>>>>>
>>>>>>>>> For the semantic problem, I think the main point is the different
>>>>>>>>> returned data types
>>>>>>>>> for the target aggregate function and the row format returned by
>>>>>>>>> the underlying storage.
>>>>>>>>> That's why we provide the producedDataType in the
>>>>>>>>> SupportsAggregatePushDown interface.
>>>>>>>>> Need to let developers know that we need to handle the semantic
>>>>>>>>> differences between
>>>>>>>>> the underlying storage system and Flink in related connectors.
>>>>>>>>> [Supplemented in proposal]
>>>>>>>>>
>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule
>>>>>>>>> rule, it's also a key point.
>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE rule
>>>>>>>>> set, and better to put it
>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>>>
>>>>>>>>> For the scalability of the interface, actually I don't exactly
>>>>>>>>> understand your suggestion. Is it to add
>>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>>>>> interface, and holds the
>>>>>>>>> `List < CallExpression > aggregateExpressions, int[]
>>>>>>>>> GroupingFields, DataType producedDataType`
>>>>>>>>> fields?
>>>>>>>>>
>>>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>>>
>>>>>>>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>>>>>>>
>>>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>>>
>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful discussion
>>>>>>>>>> has solved
>>>>>>>>>> many of my concerns.
>>>>>>>>>>
>>>>>>>>>> ## Semantic problems
>>>>>>>>>>
>>>>>>>>>> We may need to add some mechanisms or comments, because as far as
>>>>>>>>>> I know,
>>>>>>>>>> the semantics of each database is actually different, which may
>>>>>>>>>> need to be
>>>>>>>>>> reflected in your specific implementation.
>>>>>>>>>>
>>>>>>>>>> For example, the AVG output types of various databases may be
>>>>>>>>>> different.
>>>>>>>>>> For example, MySQL outputs double, this is different from Flink.
>>>>>>>>>> What
>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and count,
>>>>>>>>>> But we also
>>>>>>>>>> need care about decimal and others)
>>>>>>>>>>
>>>>>>>>>> ## The phase of push-down rule
>>>>>>>>>>
>>>>>>>>>> I strongly recommend that you do not put it in the Volcano phase,
>>>>>>>>>> which may
>>>>>>>>>> make the cost calculation very troublesome.
>>>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>>>
>>>>>>>>>> ## About interface
>>>>>>>>>>
>>>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>>>> `Aggregate`
>>>>>>>>>> interface, it contains `List<CallExpression>
>>>>>>>>>> aggregateExpressions, int[]
>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this way,
>>>>>>>>>> we can add
>>>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>>>
>>>>>>>>>> I think the current design is very good, just put forward some
>>>>>>>>>> ideas.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jingsong
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <
>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>> > Hi Jark,
>>>>>>>>>> >
>>>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>>>>> >
>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>>>>> TableSource can
>>>>>>>>>> > handle all of the aggregates.
>>>>>>>>>> > It's better to just return a boolean type to indicate whether
>>>>>>>>>> all of
>>>>>>>>>> > aggregates push down was successful or not. [Resolved in
>>>>>>>>>> proposal]
>>>>>>>>>> >
>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced
>>>>>>>>>> data type of
>>>>>>>>>> > the new table source to make sure that the new table source can
>>>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's
>>>>>>>>>> return type.
>>>>>>>>>> > The reason for adding this parameter in this function is also
>>>>>>>>>> to facilitate
>>>>>>>>>> > the user to build the final output type. I have changed this
>>>>>>>>>> parameter
>>>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>>>> >
>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have
>>>>>>>>>> changed to use
>>>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>>>> >
>>>>>>>>>> > Thx again for the suggestion, looking for the further
>>>>>>>>>> discussion.
>>>>>>>>>> >
>>>>>>>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>>>>>>>> >
>>>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>>>> > >
>>>>>>>>>> > > Regarding to the updated interface,
>>>>>>>>>> > >
>>>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>>>> aggregateExpressions,
>>>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>>>> > >
>>>>>>>>>> > > final class Result {
>>>>>>>>>> > >        private final List<CallExpression> acceptedAggregates;
>>>>>>>>>> > >        private final List<CallExpression> remainingAggregates;
>>>>>>>>>> > > }
>>>>>>>>>> > >
>>>>>>>>>> > > I have following comments:
>>>>>>>>>> > >
>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a boolean
>>>>>>>>>> return type
>>>>>>>>>> > > enough?
>>>>>>>>>> > >     From my understanding, all of the aggregates should be
>>>>>>>>>> accepted,
>>>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>>>> > >     Therefore, users don't need to distinguish which
>>>>>>>>>> aggregates are
>>>>>>>>>> > > "accepted".
>>>>>>>>>> > >
>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced data
>>>>>>>>>> type of the
>>>>>>>>>> > > new source, or just the return type of all the agg functions?
>>>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users to
>>>>>>>>>> concat a
>>>>>>>>>> > final
>>>>>>>>>> > > output type.
>>>>>>>>>> > >     The return type of each agg function can be obtained from
>>>>>>>>>> the
>>>>>>>>>> > > `CallExpression`.
>>>>>>>>>> > >
>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to `grouping`
>>>>>>>>>> or
>>>>>>>>>> > > `groupedFields` ?
>>>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>>>> "grouping sets".
>>>>>>>>>> > >
>>>>>>>>>> > >
>>>>>>>>>> > > What do you think?
>>>>>>>>>> > >
>>>>>>>>>> > > Best,
>>>>>>>>>> > > Jark
>>>>>>>>>> > >
>>>>>>>>>> > >
>>>>>>>>>> > >
>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]>
>>>>>>>>>> wrote:
>>>>>>>>>> > >
>>>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>>>> > >> I meant idea #2.
>>>>>>>>>> > >>
>>>>>>>>>> > >> Best,
>>>>>>>>>> > >> Kurt
>>>>>>>>>> > >>
>>>>>>>>>> > >>
>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>>>> [hidden email]>
>>>>>>>>>> > >> wrote:
>>>>>>>>>> > >>
>>>>>>>>>> > >>> Hi Kurt,
>>>>>>>>>> > >>>
>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is more
>>>>>>>>>> like a
>>>>>>>>>> > >>> physical operator rather than logical
>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2 which
>>>>>>>>>> handle all in
>>>>>>>>>> > >>> the physical optimization phase?
>>>>>>>>>> > >>>
>>>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>>>> > >>>
>>>>>>>>>> > >>>
>>>>>>>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>>>>>>>> > >>>
>>>>>>>>>> > >>>> Local aggregation is more like a physical operator rather
>>>>>>>>>> than logical
>>>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>>>> > >>>>
>>>>>>>>>> > >>>> Best,
>>>>>>>>>> > >>>> Kurt
>>>>>>>>>> > >>>>
>>>>>>>>>> > >>>>
>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>>>> [hidden email]>
>>>>>>>>>> > >>>> wrote:
>>>>>>>>>> > >>>>
>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>>>>>> suggestions.
>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of upgrading
>>>>>>>>>> the new
>>>>>>>>>> > table
>>>>>>>>>> > >>>> > source api,
>>>>>>>>>> > >>>> > we really should consider the new interface for the new
>>>>>>>>>> optimize
>>>>>>>>>> > >>>> rule. If
>>>>>>>>>> > >>>> > the new rule
>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it sooner
>>>>>>>>>> or later. I
>>>>>>>>>> > >>>> have
>>>>>>>>>> > >>>> > change to use
>>>>>>>>>> > >>>> > the ability interface for the SupportsAggregatePushDown
>>>>>>>>>> definition
>>>>>>>>>> > in
>>>>>>>>>> > >>>> above
>>>>>>>>>> > >>>> > proposal.
>>>>>>>>>> > >>>> >
>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a better
>>>>>>>>>> choice, and
>>>>>>>>>> > >>>> have
>>>>>>>>>> > >>>> > resolved this
>>>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>>>> > >>>> >
>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC connector,
>>>>>>>>>> as we don't
>>>>>>>>>> > >>>> have
>>>>>>>>>> > >>>> > Druid connector
>>>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>>>> > >>>> >
>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we
>>>>>>>>>> should use idea 1
>>>>>>>>>> > >>>> or
>>>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>>>> > >>>> >
>>>>>>>>>> > >>>> > What do you think?  After we reach the agreement on the
>>>>>>>>>> proposal,
>>>>>>>>>> > our
>>>>>>>>>> > >>>> team
>>>>>>>>>> > >>>> > can drive to
>>>>>>>>>> > >>>> > complete this feature.
>>>>>>>>>> > >>>> >
>>>>>>>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>>>>>>>> > >>>> >
>>>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>>>> > >>>> > >
>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>>>> improvement for
>>>>>>>>>> > >>>> Flink
>>>>>>>>>> > >>>> > SQL.
>>>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>>>> thoughts:
>>>>>>>>>> > >>>> > >
>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11
>>>>>>>>>> and
>>>>>>>>>> > proposed
>>>>>>>>>> > >>>> a new
>>>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you
>>>>>>>>>> update your
>>>>>>>>>> > >>>> proposal to
>>>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>>>>>> > >>>> > >
>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a better
>>>>>>>>>> > >>>> representation
>>>>>>>>>> > >>>> > than
>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it
>>>>>>>>>> would be
>>>>>>>>>> > easier
>>>>>>>>>> > >>>> to
>>>>>>>>>> > >>>> > know
>>>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>>>> > >>>> > >
>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors will be
>>>>>>>>>> supported
>>>>>>>>>> > in
>>>>>>>>>> > >>>> the
>>>>>>>>>> > >>>> > > plan?
>>>>>>>>>> > >>>> > >
>>>>>>>>>> > >>>> > > Best,
>>>>>>>>>> > >>>> > > Jark
>>>>>>>>>> > >>>> > >
>>>>>>>>>> > >>>> > >
>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>>>> > [hidden email]>
>>>>>>>>>> > >>>> > wrote:
>>>>>>>>>> > >>>> > >
>>>>>>>>>> > >>>> > > > Hi all,
>>>>>>>>>> > >>>> > > >
>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink
>>>>>>>>>> Planner.
>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully
>>>>>>>>>> done at Flink
>>>>>>>>>> > >>>> layer.
>>>>>>>>>> > >>>> > > > With the developing of storage, many downstream
>>>>>>>>>> storage of Flink
>>>>>>>>>> > >>>> SQL
>>>>>>>>>> > >>>> > has
>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will
>>>>>>>>>> improve
>>>>>>>>>> > >>>> performance
>>>>>>>>>> > >>>> > from
>>>>>>>>>> > >>>> > > > the perspective of the network IO and computation
>>>>>>>>>> overhead.
>>>>>>>>>> > >>>> > > >
>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>>>>>> > >>>> > > >
>>>>>>>>>> > >>>> > > >
>>>>>>>>>> > >>>> > >
>>>>>>>>>> > >>>> >
>>>>>>>>>> > >>>>
>>>>>>>>>> >
>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>>>> > >>>> > > >
>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>>>> > >>>> > > >
>>>>>>>>>> > >>>> > > > --
>>>>>>>>>> > >>>> > > >
>>>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>>>> > >>>> > > >
>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese Academy
>>>>>>>>>> of Science
>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>>>> > >>>> > > > E-mail: [hidden email] <[hidden email]
>>>>>>>>>> >
>>>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>>>> > >>>> > > >
>>>>>>>>>> > >>>> > >
>>>>>>>>>> > >>>> >
>>>>>>>>>> > >>>> >
>>>>>>>>>> > >>>> > --
>>>>>>>>>> > >>>> >
>>>>>>>>>> > >>>> > *With kind regards
>>>>>>>>>> > >>>> >
>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>>>> Science
>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>> > >>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>>>> > >>>> >
>>>>>>>>>> > >>>>
>>>>>>>>>> > >>>
>>>>>>>>>> > >>>
>>>>>>>>>> > >>> --
>>>>>>>>>> > >>>
>>>>>>>>>> > >>> *With kind regards
>>>>>>>>>> > >>> ------------------------------------------------------------
>>>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of
>>>>>>>>>> Science
>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>> > >>> QQ: 3239559*
>>>>>>>>>> > >>>
>>>>>>>>>> > >>>
>>>>>>>>>> >
>>>>>>>>>> > --
>>>>>>>>>> >
>>>>>>>>>> > *With kind regards
>>>>>>>>>> > ------------------------------------------------------------
>>>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>>> > QQ: 3239559*
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> *With kind regards
>>>>>>>>> ------------------------------------------------------------
>>>>>>>>> Sebastian Liu 刘洋
>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>> QQ: 3239559*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best, Jingsong Lee
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> --
>>
>> *With kind regards
>> ------------------------------------------------------------
>> Sebastian Liu 刘洋
>> Institute of Computing Technology, Chinese Academy of Science
>> Mobile\WeChat: +86—15201613655
>> E-mail: [hidden email] <[hidden email]>
>> QQ: 3239559*
>>
>>

--

*With kind regards
------------------------------------------------------------
Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: [hidden email] <[hidden email]>
QQ: 3239559*
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jingsong Li
Sounds good to me.

We don't have to worry about future changes, because it has covered all the
capabilities of calcite aggregation.

Best,
Jingsong

On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <[hidden email]> wrote:

> Hi Jark,
>
> Sounds good to me. For better scalability in the future, we could add the
> AggregateExpression.
> ```
>
> public class AggregateExpression implements ResolvedExpression {
>
>    private final FunctionDefinition functionDefinition;
>
>    private final List<FieldReferenceExpression> args;
>
>    private final @Nullable CallExpression filterExpression;
>
>    private final DataType resultType;
>
>    private final boolean distinct;
>
>    private final boolean approximate;
>
>
>
>    private final boolean ignoreNulls;
>
> }
> ```
>
> And we really only need one GroupingSets parameter for grouping. I have
> updated the related interface in the proposal.
> Appreciate the continued feedback and help.
>
> Jark Wu <[hidden email]> 于2021年1月6日周三 下午9:34写道:
>
>> Hi Liu, Jingsong,
>>
>> Regarding the agg with filter, I think in theory we can support pushing
>> such a pattern into source.
>> We don't need to support it in the first version, but in the long term,
>> we can support it.
>> The designed interface should be future proof.
>>
>> Considering filter arg and distinct flag should be part of the aggregate
>> expression.
>> I'm wondering if CallExpression is a good representation for it.
>> What do you think about proposing the following `AggregateExpression` to
>> replace the `CallExpression`?
>>
>> class AggregateExpression implements ResolvedExpression {
>>     private final FunctionDefinition functionDefinition;
>>     private final List<FieldReferenceExpression> args;
>>     private final @Nullable CallExpression filterExpr;
>>     private final boolean distinct;
>> }
>>
>> Besides, we don't need both groupingFields and groupingSets.
>> `groupingSets` should be a superset of groupingFields.
>> Then the interface of SupportsAggregatePushDown can be:
>>
>> interface SupportsAggregatePushDown {
>>
>>   boolean applyAggregates(
>>     List<int[]> groupingSets,
>>     List<AggregateExpression> aggregates,
>>     DataType producedDataType);
>> }
>>
>> What do you think?
>>
>> Best,
>> Jark
>>
>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <[hidden email]> wrote:
>>
>>> Hi Jingsong, Jark,
>>>
>>> Thx so much for our discussion, and the cases mentioned above are really
>>> worthy for further discussion.
>>>
>>> 1. For aggregate with filter expressions: eg: select COUNT(1)
>>> FILTER(WHERE cc_call_center_sk > 3) from call_center;
>>> For the current Blink Planner, the optimized plan will be:
>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> LocalAgg ->
>>> Exchange -> FinalAgg.
>>> As there is a Calc above the TableSource, this pattern can't match the
>>> LocalAggPushDownRule in the current design.
>>>
>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from
>>> call_center group by rollup(cc_class, cc_employees);
>>> For the current Blink Planner, the optimized plan will be:
>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> Calc.
>>> It's also not covered by the current LocalAggPushDownRule design.
>>>
>>> 3. I want to add a case which we haven't discussed yet. Aggregate with
>>> Having clause.
>>> eg: select COUNT(1) from call_center group by cc_class having
>>> max(cc_tax_percentage) > 0.2;
>>> For the current Blink Planner, the optimized plan will be:
>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg ->
>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]).
>>>
>>> The core discussion points are summarized as follows:
>>> a) Aggregate is a more complex scenario than predicates or limits, and
>>> also depends on the different underlying storages.
>>> b) One rule seems can't completely cover all aggregate scenario, but
>>> whether SupportSAggregatePushDown interface can be a bit more general?
>>> c) Could the CallExpression express the semantics of CalCite
>>> AggregateCall?
>>>
>>> IMO: Completely push down aggregate is generally hard for distributed
>>> systems. Usually we need a GROUP BY and exactly
>>> matches the partition mode in downstream storage. At the same time, the
>>> benefit of remove the final aggregate is actually limited.
>>> The LocalAggPushDown generally yields more than 80% of the CPU and IO
>>> benefits. But I also agree that
>>> the SupportsAggregatePushDown interface should be as generic as possible
>>> for future extensions, and meanwhile keep confidence
>>> in the interface we design.
>>>
>>> For core points (a): As the complexity of aggregate, one
>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / Exchange
>>> / FinalXXAgg"
>>> in physical phase. Seems that we can't solve all cases with only one
>>> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
>>> on the pattern of TableSourceScan + LocalXXAggregate at present.
>>>
>>> For core points (b & c): I think we can change the interface to be:
>>> ```
>>>
>>> boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>> aggregateExpressions, DataType producedDataType, List<int[]>
>>> groupingSets);
>>>
>>> ```
>>>
>>>
>>> Simple Group: groupingSets.size() == 1 &&
>>> groupingSets.get(0).equals(groupingFields)
>>>
>>> Cube Group: groupingSets.size() == IntMath.pow(2,
>>> groupingFields.cardinality())
>>> Rollup: Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup
>>>
>>> Then we can handle the complex grouping case. The Connector developer of
>>> the downstream storage should determine
>>> whether it supports the associated grouping type. For the filter and
>>> having clause, they will convert to be related Calc RelNode,
>>> and no longer in the LocalAggregate node, the CallExpression may be
>>> sufficient to express the semantics of AggregateCall.
>>>
>>> What do you think? Looking forward to our further discussion.
>>>
>>>
>>> Jingsong Li <[hidden email]> 于2021年1月6日周三 下午2:24写道:
>>>
>>>> > I think filter expressions and grouping sets are semantic arguments
>>>> instead of utilities. If we want to push them into sources, the connector
>>>> developers should be aware of them.Wrapping them in a context implicitly is
>>>> error-prone that the existing connector will produce wrong results when
>>>> upgrading to new Flink versions.
>>>>
>>>> We can have some mechanism to check the upgrading.
>>>>
>>>> > I think for these cases, providing a new default method to override
>>>> might be a better choice.
>>>>
>>>> Then we will have three or more methods. For the API level, I really
>>>> don't like it...
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <[hidden email]> wrote:
>>>>
>>>>> I think filter expressions and grouping sets are semantic arguments
>>>>> instead of utilities.
>>>>> If we want to push them into sources, the connector developers should
>>>>> be aware of them.
>>>>> Wrapping them in a context implicitly is error-prone that the existing
>>>>> connector will produce wrong results
>>>>>  when upgrading to new Flink versions (as we are pushing
>>>>> grouping_sets/filter_args, but connector ignores it).
>>>>> I think for these cases, providing a new default method to override
>>>>> might be a better choice.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d
>>>>>> > 1)). Can we push it down? I'm not sure that a single call expression can
>>>>>> express it, and how we should embody it and convey it to users.
>>>>>>
>>>>>> Best,
>>>>>> Jingsong
>>>>>>
>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Jark,
>>>>>>>
>>>>>>> I don't want to limit this interface to LocalAgg Push down.
>>>>>>> Actually, sometimes, we can push whole aggregation to source too.
>>>>>>>
>>>>>>> So, this rule can do something more advanced. For example, we can
>>>>>>> push down group sets to source too, for the SQL: "GROUP BY GROUPING SETS
>>>>>>> (f1, f2)". Then, we need to add more information to push down.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong
>>>>>>>
>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]> wrote:
>>>>>>>
>>>>>>>> I think this may be over designed. We should have confidence in the
>>>>>>>> interface we design, the interface should be stable.
>>>>>>>> Wrapping things in a big context has a cost of losing user
>>>>>>>> convenience.
>>>>>>>> Foremost, we don't see any parameters to add in the future. Do you
>>>>>>>> know any potential parameters?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>>
>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <[hidden email]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Sebastian,
>>>>>>>>>
>>>>>>>>> Well, I mean:
>>>>>>>>>
>>>>>>>>> `boolean applyAggregates(int[] groupingFields,
>>>>>>>>> List<CallExpression> aggregateExpressions, DataType producedDataType);`
>>>>>>>>> VS
>>>>>>>>> ```
>>>>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>>>>
>>>>>>>>> interface Aggregation {
>>>>>>>>>   int[] groupingFields();
>>>>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>>>>   DataType producedDataType();
>>>>>>>>> }
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the future, so
>>>>>>>>> make the parameters interface, which is conducive to the future expansion
>>>>>>>>> without destroying the compatibility of user implementation. If it is the
>>>>>>>>> way before, users need to modify the code.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jingsong
>>>>>>>>>
>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <
>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Jinsong,
>>>>>>>>>>
>>>>>>>>>> Thx a lot for your suggestion. These points really need to be
>>>>>>>>>> clear in the proposal.
>>>>>>>>>>
>>>>>>>>>> For the semantic problem, I think the main point is the different
>>>>>>>>>> returned data types
>>>>>>>>>> for the target aggregate function and the row format returned by
>>>>>>>>>> the underlying storage.
>>>>>>>>>> That's why we provide the producedDataType in the
>>>>>>>>>> SupportsAggregatePushDown interface.
>>>>>>>>>> Need to let developers know that we need to handle the semantic
>>>>>>>>>> differences between
>>>>>>>>>> the underlying storage system and Flink in related connectors.
>>>>>>>>>> [Supplemented in proposal]
>>>>>>>>>>
>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule
>>>>>>>>>> rule, it's also a key point.
>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE rule
>>>>>>>>>> set, and better to put it
>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>>>>
>>>>>>>>>> For the scalability of the interface, actually I don't exactly
>>>>>>>>>> understand your suggestion. Is it to add
>>>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>>>>>> interface, and holds the
>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[]
>>>>>>>>>> GroupingFields, DataType producedDataType`
>>>>>>>>>> fields?
>>>>>>>>>>
>>>>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>>>>
>>>>>>>>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>>>>>>>>
>>>>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>>>>
>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful discussion
>>>>>>>>>>> has solved
>>>>>>>>>>> many of my concerns.
>>>>>>>>>>>
>>>>>>>>>>> ## Semantic problems
>>>>>>>>>>>
>>>>>>>>>>> We may need to add some mechanisms or comments, because as far
>>>>>>>>>>> as I know,
>>>>>>>>>>> the semantics of each database is actually different, which may
>>>>>>>>>>> need to be
>>>>>>>>>>> reflected in your specific implementation.
>>>>>>>>>>>
>>>>>>>>>>> For example, the AVG output types of various databases may be
>>>>>>>>>>> different.
>>>>>>>>>>> For example, MySQL outputs double, this is different from Flink.
>>>>>>>>>>> What
>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and count,
>>>>>>>>>>> But we also
>>>>>>>>>>> need care about decimal and others)
>>>>>>>>>>>
>>>>>>>>>>> ## The phase of push-down rule
>>>>>>>>>>>
>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano
>>>>>>>>>>> phase, which may
>>>>>>>>>>> make the cost calculation very troublesome.
>>>>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>>>>
>>>>>>>>>>> ## About interface
>>>>>>>>>>>
>>>>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>>>>> `Aggregate`
>>>>>>>>>>> interface, it contains `List<CallExpression>
>>>>>>>>>>> aggregateExpressions, int[]
>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this way,
>>>>>>>>>>> we can add
>>>>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>>>>
>>>>>>>>>>> I think the current design is very good, just put forward some
>>>>>>>>>>> ideas.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jingsong
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <
>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>> > Hi Jark,
>>>>>>>>>>> >
>>>>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>>>>>> >
>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>>>>>> TableSource can
>>>>>>>>>>> > handle all of the aggregates.
>>>>>>>>>>> > It's better to just return a boolean type to indicate whether
>>>>>>>>>>> all of
>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in
>>>>>>>>>>> proposal]
>>>>>>>>>>> >
>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced
>>>>>>>>>>> data type of
>>>>>>>>>>> > the new table source to make sure that the new table source can
>>>>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's
>>>>>>>>>>> return type.
>>>>>>>>>>> > The reason for adding this parameter in this function is also
>>>>>>>>>>> to facilitate
>>>>>>>>>>> > the user to build the final output type. I have changed this
>>>>>>>>>>> parameter
>>>>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>>>>> >
>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have
>>>>>>>>>>> changed to use
>>>>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>>>>> >
>>>>>>>>>>> > Thx again for the suggestion, looking for the further
>>>>>>>>>>> discussion.
>>>>>>>>>>> >
>>>>>>>>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>>>>>>>>> >
>>>>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>>>>> > >
>>>>>>>>>>> > > Regarding to the updated interface,
>>>>>>>>>>> > >
>>>>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>>>>> aggregateExpressions,
>>>>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>>>>> > >
>>>>>>>>>>> > > final class Result {
>>>>>>>>>>> > >        private final List<CallExpression> acceptedAggregates;
>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>> remainingAggregates;
>>>>>>>>>>> > > }
>>>>>>>>>>> > >
>>>>>>>>>>> > > I have following comments:
>>>>>>>>>>> > >
>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a boolean
>>>>>>>>>>> return type
>>>>>>>>>>> > > enough?
>>>>>>>>>>> > >     From my understanding, all of the aggregates should be
>>>>>>>>>>> accepted,
>>>>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>>>>> > >     Therefore, users don't need to distinguish which
>>>>>>>>>>> aggregates are
>>>>>>>>>>> > > "accepted".
>>>>>>>>>>> > >
>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced data
>>>>>>>>>>> type of the
>>>>>>>>>>> > > new source, or just the return type of all the agg functions?
>>>>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users to
>>>>>>>>>>> concat a
>>>>>>>>>>> > final
>>>>>>>>>>> > > output type.
>>>>>>>>>>> > >     The return type of each agg function can be obtained
>>>>>>>>>>> from the
>>>>>>>>>>> > > `CallExpression`.
>>>>>>>>>>> > >
>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to `grouping`
>>>>>>>>>>> or
>>>>>>>>>>> > > `groupedFields` ?
>>>>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>>>>> "grouping sets".
>>>>>>>>>>> > >
>>>>>>>>>>> > >
>>>>>>>>>>> > > What do you think?
>>>>>>>>>>> > >
>>>>>>>>>>> > > Best,
>>>>>>>>>>> > > Jark
>>>>>>>>>>> > >
>>>>>>>>>>> > >
>>>>>>>>>>> > >
>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]>
>>>>>>>>>>> wrote:
>>>>>>>>>>> > >
>>>>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>>>>> > >> I meant idea #2.
>>>>>>>>>>> > >>
>>>>>>>>>>> > >> Best,
>>>>>>>>>>> > >> Kurt
>>>>>>>>>>> > >>
>>>>>>>>>>> > >>
>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>>>>> [hidden email]>
>>>>>>>>>>> > >> wrote:
>>>>>>>>>>> > >>
>>>>>>>>>>> > >>> Hi Kurt,
>>>>>>>>>>> > >>>
>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is more
>>>>>>>>>>> like a
>>>>>>>>>>> > >>> physical operator rather than logical
>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2 which
>>>>>>>>>>> handle all in
>>>>>>>>>>> > >>> the physical optimization phase?
>>>>>>>>>>> > >>>
>>>>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>>>>> > >>>
>>>>>>>>>>> > >>>
>>>>>>>>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>>>>>>>>> > >>>
>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator rather
>>>>>>>>>>> than logical
>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>>>>> > >>>>
>>>>>>>>>>> > >>>> Best,
>>>>>>>>>>> > >>>> Kurt
>>>>>>>>>>> > >>>>
>>>>>>>>>>> > >>>>
>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>>>>> [hidden email]>
>>>>>>>>>>> > >>>> wrote:
>>>>>>>>>>> > >>>>
>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>>>>>>> suggestions.
>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of upgrading
>>>>>>>>>>> the new
>>>>>>>>>>> > table
>>>>>>>>>>> > >>>> > source api,
>>>>>>>>>>> > >>>> > we really should consider the new interface for the new
>>>>>>>>>>> optimize
>>>>>>>>>>> > >>>> rule. If
>>>>>>>>>>> > >>>> > the new rule
>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it
>>>>>>>>>>> sooner or later. I
>>>>>>>>>>> > >>>> have
>>>>>>>>>>> > >>>> > change to use
>>>>>>>>>>> > >>>> > the ability interface for the SupportsAggregatePushDown
>>>>>>>>>>> definition
>>>>>>>>>>> > in
>>>>>>>>>>> > >>>> above
>>>>>>>>>>> > >>>> > proposal.
>>>>>>>>>>> > >>>> >
>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a
>>>>>>>>>>> better choice, and
>>>>>>>>>>> > >>>> have
>>>>>>>>>>> > >>>> > resolved this
>>>>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>>>>> > >>>> >
>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC connector,
>>>>>>>>>>> as we don't
>>>>>>>>>>> > >>>> have
>>>>>>>>>>> > >>>> > Druid connector
>>>>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>>>>> > >>>> >
>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we
>>>>>>>>>>> should use idea 1
>>>>>>>>>>> > >>>> or
>>>>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>>>>> > >>>> >
>>>>>>>>>>> > >>>> > What do you think?  After we reach the agreement on the
>>>>>>>>>>> proposal,
>>>>>>>>>>> > our
>>>>>>>>>>> > >>>> team
>>>>>>>>>>> > >>>> > can drive to
>>>>>>>>>>> > >>>> > complete this feature.
>>>>>>>>>>> > >>>> >
>>>>>>>>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>>>>>>>>> > >>>> >
>>>>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>>>>> > >>>> > >
>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>>>>> improvement for
>>>>>>>>>>> > >>>> Flink
>>>>>>>>>>> > >>>> > SQL.
>>>>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>>>>> thoughts:
>>>>>>>>>>> > >>>> > >
>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in
>>>>>>>>>>> 1.11 and
>>>>>>>>>>> > proposed
>>>>>>>>>>> > >>>> a new
>>>>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you
>>>>>>>>>>> update your
>>>>>>>>>>> > >>>> proposal to
>>>>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>>>>>>> > >>>> > >
>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a
>>>>>>>>>>> better
>>>>>>>>>>> > >>>> representation
>>>>>>>>>>> > >>>> > than
>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it
>>>>>>>>>>> would be
>>>>>>>>>>> > easier
>>>>>>>>>>> > >>>> to
>>>>>>>>>>> > >>>> > know
>>>>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>>>>> > >>>> > >
>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors will
>>>>>>>>>>> be supported
>>>>>>>>>>> > in
>>>>>>>>>>> > >>>> the
>>>>>>>>>>> > >>>> > > plan?
>>>>>>>>>>> > >>>> > >
>>>>>>>>>>> > >>>> > > Best,
>>>>>>>>>>> > >>>> > > Jark
>>>>>>>>>>> > >>>> > >
>>>>>>>>>>> > >>>> > >
>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>>>>> > [hidden email]>
>>>>>>>>>>> > >>>> > wrote:
>>>>>>>>>>> > >>>> > >
>>>>>>>>>>> > >>>> > > > Hi all,
>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink
>>>>>>>>>>> Planner.
>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully
>>>>>>>>>>> done at Flink
>>>>>>>>>>> > >>>> layer.
>>>>>>>>>>> > >>>> > > > With the developing of storage, many downstream
>>>>>>>>>>> storage of Flink
>>>>>>>>>>> > >>>> SQL
>>>>>>>>>>> > >>>> > has
>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will
>>>>>>>>>>> improve
>>>>>>>>>>> > >>>> performance
>>>>>>>>>>> > >>>> > from
>>>>>>>>>>> > >>>> > > > the perspective of the network IO and computation
>>>>>>>>>>> overhead.
>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>> > >>>> > >
>>>>>>>>>>> > >>>> >
>>>>>>>>>>> > >>>>
>>>>>>>>>>> >
>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>> > >>>> > > > --
>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese Academy
>>>>>>>>>>> of Science
>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>>>>> > >>>> > > > E-mail: [hidden email] <
>>>>>>>>>>> [hidden email]>
>>>>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>> > >>>> > >
>>>>>>>>>>> > >>>> >
>>>>>>>>>>> > >>>> >
>>>>>>>>>>> > >>>> > --
>>>>>>>>>>> > >>>> >
>>>>>>>>>>> > >>>> > *With kind regards
>>>>>>>>>>> > >>>> >
>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>> Science
>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>> > >>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>>>>> > >>>> >
>>>>>>>>>>> > >>>>
>>>>>>>>>>> > >>>
>>>>>>>>>>> > >>>
>>>>>>>>>>> > >>> --
>>>>>>>>>>> > >>>
>>>>>>>>>>> > >>> *With kind regards
>>>>>>>>>>> > >>>
>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>> Science
>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>> > >>> QQ: 3239559*
>>>>>>>>>>> > >>>
>>>>>>>>>>> > >>>
>>>>>>>>>>> >
>>>>>>>>>>> > --
>>>>>>>>>>> >
>>>>>>>>>>> > *With kind regards
>>>>>>>>>>> > ------------------------------------------------------------
>>>>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>> > QQ: 3239559*
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> *With kind regards
>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>> Sebastian Liu 刘洋
>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>> QQ: 3239559*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>>
>>> --
>>>
>>> *With kind regards
>>> ------------------------------------------------------------
>>> Sebastian Liu 刘洋
>>> Institute of Computing Technology, Chinese Academy of Science
>>> Mobile\WeChat: +86—15201613655
>>> E-mail: [hidden email] <[hidden email]>
>>> QQ: 3239559*
>>>
>>>
>
> --
>
> *With kind regards
> ------------------------------------------------------------
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: [hidden email] <[hidden email]>
> QQ: 3239559*
>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jark Wu-2
Thanks for updating the design doc.
It looks good to me.

Best,
Jark

On Thu, 7 Jan 2021 at 10:16, Jingsong Li <[hidden email]> wrote:

> Sounds good to me.
>
> We don't have to worry about future changes, because it has covered all
> the capabilities of calcite aggregation.
>
> Best,
> Jingsong
>
> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <[hidden email]>
> wrote:
>
>> Hi Jark,
>>
>> Sounds good to me. For better scalability in the future, we could add the
>> AggregateExpression.
>> ```
>>
>> public class AggregateExpression implements ResolvedExpression {
>>
>>    private final FunctionDefinition functionDefinition;
>>
>>    private final List<FieldReferenceExpression> args;
>>
>>    private final @Nullable CallExpression filterExpression;
>>
>>    private final DataType resultType;
>>
>>    private final boolean distinct;
>>
>>    private final boolean approximate;
>>
>>
>>
>>    private final boolean ignoreNulls;
>>
>> }
>> ```
>>
>> And we really only need one GroupingSets parameter for grouping. I have
>> updated the related interface in the proposal.
>> Appreciate the continued feedback and help.
>>
>> Jark Wu <[hidden email]> 于2021年1月6日周三 下午9:34写道:
>>
>>> Hi Liu, Jingsong,
>>>
>>> Regarding the agg with filter, I think in theory we can support pushing
>>> such a pattern into source.
>>> We don't need to support it in the first version, but in the long term,
>>> we can support it.
>>> The designed interface should be future proof.
>>>
>>> Considering filter arg and distinct flag should be part of the aggregate
>>> expression.
>>> I'm wondering if CallExpression is a good representation for it.
>>> What do you think about proposing the following `AggregateExpression` to
>>> replace the `CallExpression`?
>>>
>>> class AggregateExpression implements ResolvedExpression {
>>>     private final FunctionDefinition functionDefinition;
>>>     private final List<FieldReferenceExpression> args;
>>>     private final @Nullable CallExpression filterExpr;
>>>     private final boolean distinct;
>>> }
>>>
>>> Besides, we don't need both groupingFields and groupingSets.
>>> `groupingSets` should be a superset of groupingFields.
>>> Then the interface of SupportsAggregatePushDown can be:
>>>
>>> interface SupportsAggregatePushDown {
>>>
>>>   boolean applyAggregates(
>>>     List<int[]> groupingSets,
>>>     List<AggregateExpression> aggregates,
>>>     DataType producedDataType);
>>> }
>>>
>>> What do you think?
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <[hidden email]>
>>> wrote:
>>>
>>>> Hi Jingsong, Jark,
>>>>
>>>> Thx so much for our discussion, and the cases mentioned above are
>>>> really worthy for further discussion.
>>>>
>>>> 1. For aggregate with filter expressions: eg: select COUNT(1)
>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center;
>>>> For the current Blink Planner, the optimized plan will be:
>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> LocalAgg
>>>> -> Exchange -> FinalAgg.
>>>> As there is a Calc above the TableSource, this pattern can't match the
>>>> LocalAggPushDownRule in the current design.
>>>>
>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from
>>>> call_center group by rollup(cc_class, cc_employees);
>>>> For the current Blink Planner, the optimized plan will be:
>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> Calc.
>>>> It's also not covered by the current LocalAggPushDownRule design.
>>>>
>>>> 3. I want to add a case which we haven't discussed yet. Aggregate with
>>>> Having clause.
>>>> eg: select COUNT(1) from call_center group by cc_class having
>>>> max(cc_tax_percentage) > 0.2;
>>>> For the current Blink Planner, the optimized plan will be:
>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg ->
>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]).
>>>>
>>>> The core discussion points are summarized as follows:
>>>> a) Aggregate is a more complex scenario than predicates or limits, and
>>>> also depends on the different underlying storages.
>>>> b) One rule seems can't completely cover all aggregate scenario, but
>>>> whether SupportSAggregatePushDown interface can be a bit more general?
>>>> c) Could the CallExpression express the semantics of CalCite
>>>> AggregateCall?
>>>>
>>>> IMO: Completely push down aggregate is generally hard for distributed
>>>> systems. Usually we need a GROUP BY and exactly
>>>> matches the partition mode in downstream storage. At the same time, the
>>>> benefit of remove the final aggregate is actually limited.
>>>> The LocalAggPushDown generally yields more than 80% of the CPU and IO
>>>> benefits. But I also agree that
>>>> the SupportsAggregatePushDown interface should be as generic as
>>>> possible for future extensions, and meanwhile keep confidence
>>>> in the interface we design.
>>>>
>>>> For core points (a): As the complexity of aggregate, one
>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / Exchange
>>>> / FinalXXAgg"
>>>> in physical phase. Seems that we can't solve all cases with only one
>>>> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
>>>> on the pattern of TableSourceScan + LocalXXAggregate at present.
>>>>
>>>> For core points (b & c): I think we can change the interface to be:
>>>> ```
>>>>
>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>> aggregateExpressions, DataType producedDataType, List<int[]>
>>>> groupingSets);
>>>>
>>>> ```
>>>>
>>>>
>>>> Simple Group: groupingSets.size() == 1 &&
>>>> groupingSets.get(0).equals(groupingFields)
>>>>
>>>> Cube Group: groupingSets.size() == IntMath.pow(2,
>>>> groupingFields.cardinality())
>>>> Rollup: Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup
>>>>
>>>> Then we can handle the complex grouping case. The Connector developer
>>>> of the downstream storage should determine
>>>> whether it supports the associated grouping type. For the filter and
>>>> having clause, they will convert to be related Calc RelNode,
>>>> and no longer in the LocalAggregate node, the CallExpression may be
>>>> sufficient to express the semantics of AggregateCall.
>>>>
>>>> What do you think? Looking forward to our further discussion.
>>>>
>>>>
>>>> Jingsong Li <[hidden email]> 于2021年1月6日周三 下午2:24写道:
>>>>
>>>>> > I think filter expressions and grouping sets are semantic arguments
>>>>> instead of utilities. If we want to push them into sources, the connector
>>>>> developers should be aware of them.Wrapping them in a context implicitly is
>>>>> error-prone that the existing connector will produce wrong results when
>>>>> upgrading to new Flink versions.
>>>>>
>>>>> We can have some mechanism to check the upgrading.
>>>>>
>>>>> > I think for these cases, providing a new default method to override
>>>>> might be a better choice.
>>>>>
>>>>> Then we will have three or more methods. For the API level, I really
>>>>> don't like it...
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <[hidden email]> wrote:
>>>>>
>>>>>> I think filter expressions and grouping sets are semantic arguments
>>>>>> instead of utilities.
>>>>>> If we want to push them into sources, the connector developers should
>>>>>> be aware of them.
>>>>>> Wrapping them in a context implicitly is error-prone that the
>>>>>> existing connector will produce wrong results
>>>>>>  when upgrading to new Flink versions (as we are pushing
>>>>>> grouping_sets/filter_args, but connector ignores it).
>>>>>> I think for these cases, providing a new default method to override
>>>>>> might be a better choice.
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE
>>>>>>> d > 1)). Can we push it down? I'm not sure that a single call expression
>>>>>>> can express it, and how we should embody it and convey it to users.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong
>>>>>>>
>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Jark,
>>>>>>>>
>>>>>>>> I don't want to limit this interface to LocalAgg Push down.
>>>>>>>> Actually, sometimes, we can push whole aggregation to source too.
>>>>>>>>
>>>>>>>> So, this rule can do something more advanced. For example, we can
>>>>>>>> push down group sets to source too, for the SQL: "GROUP BY GROUPING SETS
>>>>>>>> (f1, f2)". Then, we need to add more information to push down.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jingsong
>>>>>>>>
>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]> wrote:
>>>>>>>>
>>>>>>>>> I think this may be over designed. We should have confidence in
>>>>>>>>> the interface we design, the interface should be stable.
>>>>>>>>> Wrapping things in a big context has a cost of losing user
>>>>>>>>> convenience.
>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do you
>>>>>>>>> know any potential parameters?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>>
>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <[hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Sebastian,
>>>>>>>>>>
>>>>>>>>>> Well, I mean:
>>>>>>>>>>
>>>>>>>>>> `boolean applyAggregates(int[] groupingFields,
>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType producedDataType);`
>>>>>>>>>> VS
>>>>>>>>>> ```
>>>>>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>>>>>
>>>>>>>>>> interface Aggregation {
>>>>>>>>>>   int[] groupingFields();
>>>>>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>>>>>   DataType producedDataType();
>>>>>>>>>> }
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the future, so
>>>>>>>>>> make the parameters interface, which is conducive to the future expansion
>>>>>>>>>> without destroying the compatibility of user implementation. If it is the
>>>>>>>>>> way before, users need to modify the code.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jingsong
>>>>>>>>>>
>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <
>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Jinsong,
>>>>>>>>>>>
>>>>>>>>>>> Thx a lot for your suggestion. These points really need to be
>>>>>>>>>>> clear in the proposal.
>>>>>>>>>>>
>>>>>>>>>>> For the semantic problem, I think the main point is the
>>>>>>>>>>> different returned data types
>>>>>>>>>>> for the target aggregate function and the row format returned by
>>>>>>>>>>> the underlying storage.
>>>>>>>>>>> That's why we provide the producedDataType in the
>>>>>>>>>>> SupportsAggregatePushDown interface.
>>>>>>>>>>> Need to let developers know that we need to handle the semantic
>>>>>>>>>>> differences between
>>>>>>>>>>> the underlying storage system and Flink in related connectors.
>>>>>>>>>>> [Supplemented in proposal]
>>>>>>>>>>>
>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule
>>>>>>>>>>> rule, it's also a key point.
>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE
>>>>>>>>>>> rule set, and better to put it
>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>>>>>
>>>>>>>>>>> For the scalability of the interface, actually I don't exactly
>>>>>>>>>>> understand your suggestion. Is it to add
>>>>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>>>>>>> interface, and holds the
>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[]
>>>>>>>>>>> GroupingFields, DataType producedDataType`
>>>>>>>>>>> fields?
>>>>>>>>>>>
>>>>>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>>>>>
>>>>>>>>>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>>>>>
>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful
>>>>>>>>>>>> discussion has solved
>>>>>>>>>>>> many of my concerns.
>>>>>>>>>>>>
>>>>>>>>>>>> ## Semantic problems
>>>>>>>>>>>>
>>>>>>>>>>>> We may need to add some mechanisms or comments, because as far
>>>>>>>>>>>> as I know,
>>>>>>>>>>>> the semantics of each database is actually different, which may
>>>>>>>>>>>> need to be
>>>>>>>>>>>> reflected in your specific implementation.
>>>>>>>>>>>>
>>>>>>>>>>>> For example, the AVG output types of various databases may be
>>>>>>>>>>>> different.
>>>>>>>>>>>> For example, MySQL outputs double, this is different from
>>>>>>>>>>>> Flink. What
>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and count,
>>>>>>>>>>>> But we also
>>>>>>>>>>>> need care about decimal and others)
>>>>>>>>>>>>
>>>>>>>>>>>> ## The phase of push-down rule
>>>>>>>>>>>>
>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano
>>>>>>>>>>>> phase, which may
>>>>>>>>>>>> make the cost calculation very troublesome.
>>>>>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>>>>>
>>>>>>>>>>>> ## About interface
>>>>>>>>>>>>
>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>>>>>> `Aggregate`
>>>>>>>>>>>> interface, it contains `List<CallExpression>
>>>>>>>>>>>> aggregateExpressions, int[]
>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this way,
>>>>>>>>>>>> we can add
>>>>>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>>>>>
>>>>>>>>>>>> I think the current design is very good, just put forward some
>>>>>>>>>>>> ideas.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <
>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> > Hi Jark,
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>>>>>>> >
>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>>>>>>> TableSource can
>>>>>>>>>>>> > handle all of the aggregates.
>>>>>>>>>>>> > It's better to just return a boolean type to indicate whether
>>>>>>>>>>>> all of
>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in
>>>>>>>>>>>> proposal]
>>>>>>>>>>>> >
>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced
>>>>>>>>>>>> data type of
>>>>>>>>>>>> > the new table source to make sure that the new table source
>>>>>>>>>>>> can
>>>>>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's
>>>>>>>>>>>> return type.
>>>>>>>>>>>> > The reason for adding this parameter in this function is also
>>>>>>>>>>>> to facilitate
>>>>>>>>>>>> > the user to build the final output type. I have changed this
>>>>>>>>>>>> parameter
>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>>>>>> >
>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have
>>>>>>>>>>>> changed to use
>>>>>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thx again for the suggestion, looking for the further
>>>>>>>>>>>> discussion.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>>>>>>>>>> >
>>>>>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > Regarding to the updated interface,
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>>>>>> aggregateExpressions,
>>>>>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > final class Result {
>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>> acceptedAggregates;
>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>> remainingAggregates;
>>>>>>>>>>>> > > }
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > I have following comments:
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a
>>>>>>>>>>>> boolean return type
>>>>>>>>>>>> > > enough?
>>>>>>>>>>>> > >     From my understanding, all of the aggregates should be
>>>>>>>>>>>> accepted,
>>>>>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>>>>>> > >     Therefore, users don't need to distinguish which
>>>>>>>>>>>> aggregates are
>>>>>>>>>>>> > > "accepted".
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced data
>>>>>>>>>>>> type of the
>>>>>>>>>>>> > > new source, or just the return type of all the agg
>>>>>>>>>>>> functions?
>>>>>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users to
>>>>>>>>>>>> concat a
>>>>>>>>>>>> > final
>>>>>>>>>>>> > > output type.
>>>>>>>>>>>> > >     The return type of each agg function can be obtained
>>>>>>>>>>>> from the
>>>>>>>>>>>> > > `CallExpression`.
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to
>>>>>>>>>>>> `grouping` or
>>>>>>>>>>>> > > `groupedFields` ?
>>>>>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>>>>>> "grouping sets".
>>>>>>>>>>>> > >
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > What do you think?
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > Best,
>>>>>>>>>>>> > > Jark
>>>>>>>>>>>> > >
>>>>>>>>>>>> > >
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> > >
>>>>>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>>>>>> > >> I meant idea #2.
>>>>>>>>>>>> > >>
>>>>>>>>>>>> > >> Best,
>>>>>>>>>>>> > >> Kurt
>>>>>>>>>>>> > >>
>>>>>>>>>>>> > >>
>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>> > >> wrote:
>>>>>>>>>>>> > >>
>>>>>>>>>>>> > >>> Hi Kurt,
>>>>>>>>>>>> > >>>
>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is more
>>>>>>>>>>>> like a
>>>>>>>>>>>> > >>> physical operator rather than logical
>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2 which
>>>>>>>>>>>> handle all in
>>>>>>>>>>>> > >>> the physical optimization phase?
>>>>>>>>>>>> > >>>
>>>>>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>>>>>> > >>>
>>>>>>>>>>>> > >>>
>>>>>>>>>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>>>>>>>>>> > >>>
>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator
>>>>>>>>>>>> rather than logical
>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>>>>>> > >>>>
>>>>>>>>>>>> > >>>> Best,
>>>>>>>>>>>> > >>>> Kurt
>>>>>>>>>>>> > >>>>
>>>>>>>>>>>> > >>>>
>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>> > >>>> wrote:
>>>>>>>>>>>> > >>>>
>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>>>>>>>> suggestions.
>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of
>>>>>>>>>>>> upgrading the new
>>>>>>>>>>>> > table
>>>>>>>>>>>> > >>>> > source api,
>>>>>>>>>>>> > >>>> > we really should consider the new interface for the
>>>>>>>>>>>> new optimize
>>>>>>>>>>>> > >>>> rule. If
>>>>>>>>>>>> > >>>> > the new rule
>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it
>>>>>>>>>>>> sooner or later. I
>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>> > >>>> > change to use
>>>>>>>>>>>> > >>>> > the ability interface for the
>>>>>>>>>>>> SupportsAggregatePushDown definition
>>>>>>>>>>>> > in
>>>>>>>>>>>> > >>>> above
>>>>>>>>>>>> > >>>> > proposal.
>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a
>>>>>>>>>>>> better choice, and
>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>> > >>>> > resolved this
>>>>>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC
>>>>>>>>>>>> connector, as we don't
>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>> > >>>> > Druid connector
>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we
>>>>>>>>>>>> should use idea 1
>>>>>>>>>>>> > >>>> or
>>>>>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>> > >>>> > What do you think?  After we reach the agreement on
>>>>>>>>>>>> the proposal,
>>>>>>>>>>>> > our
>>>>>>>>>>>> > >>>> team
>>>>>>>>>>>> > >>>> > can drive to
>>>>>>>>>>>> > >>>> > complete this feature.
>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>>>>>> improvement for
>>>>>>>>>>>> > >>>> Flink
>>>>>>>>>>>> > >>>> > SQL.
>>>>>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>>>>>> thoughts:
>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in
>>>>>>>>>>>> 1.11 and
>>>>>>>>>>>> > proposed
>>>>>>>>>>>> > >>>> a new
>>>>>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you
>>>>>>>>>>>> update your
>>>>>>>>>>>> > >>>> proposal to
>>>>>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a
>>>>>>>>>>>> better
>>>>>>>>>>>> > >>>> representation
>>>>>>>>>>>> > >>>> > than
>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it
>>>>>>>>>>>> would be
>>>>>>>>>>>> > easier
>>>>>>>>>>>> > >>>> to
>>>>>>>>>>>> > >>>> > know
>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors will
>>>>>>>>>>>> be supported
>>>>>>>>>>>> > in
>>>>>>>>>>>> > >>>> the
>>>>>>>>>>>> > >>>> > > plan?
>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>> > >>>> > > Best,
>>>>>>>>>>>> > >>>> > > Jark
>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>>>>>> > [hidden email]>
>>>>>>>>>>>> > >>>> > wrote:
>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>> > >>>> > > > Hi all,
>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink
>>>>>>>>>>>> Planner.
>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully
>>>>>>>>>>>> done at Flink
>>>>>>>>>>>> > >>>> layer.
>>>>>>>>>>>> > >>>> > > > With the developing of storage, many downstream
>>>>>>>>>>>> storage of Flink
>>>>>>>>>>>> > >>>> SQL
>>>>>>>>>>>> > >>>> > has
>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will
>>>>>>>>>>>> improve
>>>>>>>>>>>> > >>>> performance
>>>>>>>>>>>> > >>>> > from
>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and computation
>>>>>>>>>>>> overhead.
>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>> > >>>>
>>>>>>>>>>>> >
>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>> > >>>> > > > --
>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese Academy
>>>>>>>>>>>> of Science
>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>> > >>>> > > > E-mail: [hidden email] <
>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>> > >>>> > --
>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>> > >>>> > *With kind regards
>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>> Science
>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>> > >>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>> > >>>>
>>>>>>>>>>>> > >>>
>>>>>>>>>>>> > >>>
>>>>>>>>>>>> > >>> --
>>>>>>>>>>>> > >>>
>>>>>>>>>>>> > >>> *With kind regards
>>>>>>>>>>>> > >>>
>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>> Science
>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>> > >>> QQ: 3239559*
>>>>>>>>>>>> > >>>
>>>>>>>>>>>> > >>>
>>>>>>>>>>>> >
>>>>>>>>>>>> > --
>>>>>>>>>>>> >
>>>>>>>>>>>> > *With kind regards
>>>>>>>>>>>> > ------------------------------------------------------------
>>>>>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>> > QQ: 3239559*
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> *With kind regards
>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>> Sebastian Liu 刘洋
>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>> QQ: 3239559*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best, Jingsong Lee
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *With kind regards
>>>> ------------------------------------------------------------
>>>> Sebastian Liu 刘洋
>>>> Institute of Computing Technology, Chinese Academy of Science
>>>> Mobile\WeChat: +86—15201613655
>>>> E-mail: [hidden email] <[hidden email]>
>>>> QQ: 3239559*
>>>>
>>>>
>>
>> --
>>
>> *With kind regards
>> ------------------------------------------------------------
>> Sebastian Liu 刘洋
>> Institute of Computing Technology, Chinese Academy of Science
>> Mobile\WeChat: +86—15201613655
>> E-mail: [hidden email] <[hidden email]>
>> QQ: 3239559*
>>
>>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Sebastian Liu
Hi Jark,

Seems that we have reached the agreement on the proposal. Could you please
help to assign the below jira ticket to me?
https://issues.apache.org/jira/browse/FLINK-20791

Jark Wu <[hidden email]> 于2021年1月7日周四 上午10:25写道:

> Thanks for updating the design doc.
> It looks good to me.
>
> Best,
> Jark
>
> On Thu, 7 Jan 2021 at 10:16, Jingsong Li <[hidden email]> wrote:
>
>> Sounds good to me.
>>
>> We don't have to worry about future changes, because it has covered all
>> the capabilities of calcite aggregation.
>>
>> Best,
>> Jingsong
>>
>> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <[hidden email]>
>> wrote:
>>
>>> Hi Jark,
>>>
>>> Sounds good to me. For better scalability in the future, we could add
>>> the AggregateExpression.
>>> ```
>>>
>>> public class AggregateExpression implements ResolvedExpression {
>>>
>>>    private final FunctionDefinition functionDefinition;
>>>
>>>    private final List<FieldReferenceExpression> args;
>>>
>>>    private final @Nullable CallExpression filterExpression;
>>>
>>>    private final DataType resultType;
>>>
>>>    private final boolean distinct;
>>>
>>>    private final boolean approximate;
>>>
>>>
>>>
>>>    private final boolean ignoreNulls;
>>>
>>> }
>>> ```
>>>
>>> And we really only need one GroupingSets parameter for grouping. I have
>>> updated the related interface in the proposal.
>>> Appreciate the continued feedback and help.
>>>
>>> Jark Wu <[hidden email]> 于2021年1月6日周三 下午9:34写道:
>>>
>>>> Hi Liu, Jingsong,
>>>>
>>>> Regarding the agg with filter, I think in theory we can support pushing
>>>> such a pattern into source.
>>>> We don't need to support it in the first version, but in the long term,
>>>> we can support it.
>>>> The designed interface should be future proof.
>>>>
>>>> Considering filter arg and distinct flag should be part of the
>>>> aggregate expression.
>>>> I'm wondering if CallExpression is a good representation for it.
>>>> What do you think about proposing the following `AggregateExpression`
>>>> to replace the `CallExpression`?
>>>>
>>>> class AggregateExpression implements ResolvedExpression {
>>>>     private final FunctionDefinition functionDefinition;
>>>>     private final List<FieldReferenceExpression> args;
>>>>     private final @Nullable CallExpression filterExpr;
>>>>     private final boolean distinct;
>>>> }
>>>>
>>>> Besides, we don't need both groupingFields and groupingSets.
>>>> `groupingSets` should be a superset of groupingFields.
>>>> Then the interface of SupportsAggregatePushDown can be:
>>>>
>>>> interface SupportsAggregatePushDown {
>>>>
>>>>   boolean applyAggregates(
>>>>     List<int[]> groupingSets,
>>>>     List<AggregateExpression> aggregates,
>>>>     DataType producedDataType);
>>>> }
>>>>
>>>> What do you think?
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi Jingsong, Jark,
>>>>>
>>>>> Thx so much for our discussion, and the cases mentioned above are
>>>>> really worthy for further discussion.
>>>>>
>>>>> 1. For aggregate with filter expressions: eg: select COUNT(1)
>>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center;
>>>>> For the current Blink Planner, the optimized plan will be:
>>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> LocalAgg
>>>>> -> Exchange -> FinalAgg.
>>>>> As there is a Calc above the TableSource, this pattern can't match the
>>>>> LocalAggPushDownRule in the current design.
>>>>>
>>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from
>>>>> call_center group by rollup(cc_class, cc_employees);
>>>>> For the current Blink Planner, the optimized plan will be:
>>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> Calc.
>>>>> It's also not covered by the current LocalAggPushDownRule design.
>>>>>
>>>>> 3. I want to add a case which we haven't discussed yet. Aggregate with
>>>>> Having clause.
>>>>> eg: select COUNT(1) from call_center group by cc_class having
>>>>> max(cc_tax_percentage) > 0.2;
>>>>> For the current Blink Planner, the optimized plan will be:
>>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg ->
>>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]).
>>>>>
>>>>> The core discussion points are summarized as follows:
>>>>> a) Aggregate is a more complex scenario than predicates or limits, and
>>>>> also depends on the different underlying storages.
>>>>> b) One rule seems can't completely cover all aggregate scenario, but
>>>>> whether SupportSAggregatePushDown interface can be a bit more general?
>>>>> c) Could the CallExpression express the semantics of CalCite
>>>>> AggregateCall?
>>>>>
>>>>> IMO: Completely push down aggregate is generally hard for distributed
>>>>> systems. Usually we need a GROUP BY and exactly
>>>>> matches the partition mode in downstream storage. At the same time,
>>>>> the benefit of remove the final aggregate is actually limited.
>>>>> The LocalAggPushDown generally yields more than 80% of the CPU and IO
>>>>> benefits. But I also agree that
>>>>> the SupportsAggregatePushDown interface should be as generic as
>>>>> possible for future extensions, and meanwhile keep confidence
>>>>> in the interface we design.
>>>>>
>>>>> For core points (a): As the complexity of aggregate, one
>>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / Exchange
>>>>> / FinalXXAgg"
>>>>> in physical phase. Seems that we can't solve all cases with only one
>>>>> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
>>>>> on the pattern of TableSourceScan + LocalXXAggregate at present.
>>>>>
>>>>> For core points (b & c): I think we can change the interface to be:
>>>>> ```
>>>>>
>>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>> aggregateExpressions, DataType producedDataType, List<int[]>
>>>>> groupingSets);
>>>>>
>>>>> ```
>>>>>
>>>>>
>>>>> Simple Group: groupingSets.size() == 1 &&
>>>>> groupingSets.get(0).equals(groupingFields)
>>>>>
>>>>> Cube Group: groupingSets.size() == IntMath.pow(2,
>>>>> groupingFields.cardinality())
>>>>> Rollup: Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup
>>>>>
>>>>> Then we can handle the complex grouping case. The Connector developer
>>>>> of the downstream storage should determine
>>>>> whether it supports the associated grouping type. For the filter and
>>>>> having clause, they will convert to be related Calc RelNode,
>>>>> and no longer in the LocalAggregate node, the CallExpression may be
>>>>> sufficient to express the semantics of AggregateCall.
>>>>>
>>>>> What do you think? Looking forward to our further discussion.
>>>>>
>>>>>
>>>>> Jingsong Li <[hidden email]> 于2021年1月6日周三 下午2:24写道:
>>>>>
>>>>>> > I think filter expressions and grouping sets are semantic arguments
>>>>>> instead of utilities. If we want to push them into sources, the connector
>>>>>> developers should be aware of them.Wrapping them in a context implicitly is
>>>>>> error-prone that the existing connector will produce wrong results when
>>>>>> upgrading to new Flink versions.
>>>>>>
>>>>>> We can have some mechanism to check the upgrading.
>>>>>>
>>>>>> > I think for these cases, providing a new default method to override
>>>>>> might be a better choice.
>>>>>>
>>>>>> Then we will have three or more methods. For the API level, I really
>>>>>> don't like it...
>>>>>>
>>>>>> Best,
>>>>>> Jingsong
>>>>>>
>>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <[hidden email]> wrote:
>>>>>>
>>>>>>> I think filter expressions and grouping sets are semantic arguments
>>>>>>> instead of utilities.
>>>>>>> If we want to push them into sources, the connector developers
>>>>>>> should be aware of them.
>>>>>>> Wrapping them in a context implicitly is error-prone that the
>>>>>>> existing connector will produce wrong results
>>>>>>>  when upgrading to new Flink versions (as we are pushing
>>>>>>> grouping_sets/filter_args, but connector ignores it).
>>>>>>> I think for these cases, providing a new default method to override
>>>>>>> might be a better choice.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE
>>>>>>>> d > 1)). Can we push it down? I'm not sure that a single call expression
>>>>>>>> can express it, and how we should embody it and convey it to users.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jingsong
>>>>>>>>
>>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <[hidden email]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Jark,
>>>>>>>>>
>>>>>>>>> I don't want to limit this interface to LocalAgg Push down.
>>>>>>>>> Actually, sometimes, we can push whole aggregation to source too.
>>>>>>>>>
>>>>>>>>> So, this rule can do something more advanced. For example, we can
>>>>>>>>> push down group sets to source too, for the SQL: "GROUP BY GROUPING SETS
>>>>>>>>> (f1, f2)". Then, we need to add more information to push down.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jingsong
>>>>>>>>>
>>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>>> I think this may be over designed. We should have confidence in
>>>>>>>>>> the interface we design, the interface should be stable.
>>>>>>>>>> Wrapping things in a big context has a cost of losing user
>>>>>>>>>> convenience.
>>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do
>>>>>>>>>> you know any potential parameters?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jark
>>>>>>>>>>
>>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <[hidden email]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Sebastian,
>>>>>>>>>>>
>>>>>>>>>>> Well, I mean:
>>>>>>>>>>>
>>>>>>>>>>> `boolean applyAggregates(int[] groupingFields,
>>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType producedDataType);`
>>>>>>>>>>> VS
>>>>>>>>>>> ```
>>>>>>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>>>>>>
>>>>>>>>>>> interface Aggregation {
>>>>>>>>>>>   int[] groupingFields();
>>>>>>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>>>>>>   DataType producedDataType();
>>>>>>>>>>> }
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the future, so
>>>>>>>>>>> make the parameters interface, which is conducive to the future expansion
>>>>>>>>>>> without destroying the compatibility of user implementation. If it is the
>>>>>>>>>>> way before, users need to modify the code.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jingsong
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <
>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Jinsong,
>>>>>>>>>>>>
>>>>>>>>>>>> Thx a lot for your suggestion. These points really need to be
>>>>>>>>>>>> clear in the proposal.
>>>>>>>>>>>>
>>>>>>>>>>>> For the semantic problem, I think the main point is the
>>>>>>>>>>>> different returned data types
>>>>>>>>>>>> for the target aggregate function and the row format returned
>>>>>>>>>>>> by the underlying storage.
>>>>>>>>>>>> That's why we provide the producedDataType in the
>>>>>>>>>>>> SupportsAggregatePushDown interface.
>>>>>>>>>>>> Need to let developers know that we need to handle the semantic
>>>>>>>>>>>> differences between
>>>>>>>>>>>> the underlying storage system and Flink in related connectors.
>>>>>>>>>>>> [Supplemented in proposal]
>>>>>>>>>>>>
>>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule
>>>>>>>>>>>> rule, it's also a key point.
>>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE
>>>>>>>>>>>> rule set, and better to put it
>>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>>>>>>
>>>>>>>>>>>> For the scalability of the interface, actually I don't exactly
>>>>>>>>>>>> understand your suggestion. Is it to add
>>>>>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>>>>>>>> interface, and holds the
>>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[]
>>>>>>>>>>>> GroupingFields, DataType producedDataType`
>>>>>>>>>>>> fields?
>>>>>>>>>>>>
>>>>>>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>>>>>>
>>>>>>>>>>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>>>>>>
>>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful
>>>>>>>>>>>>> discussion has solved
>>>>>>>>>>>>> many of my concerns.
>>>>>>>>>>>>>
>>>>>>>>>>>>> ## Semantic problems
>>>>>>>>>>>>>
>>>>>>>>>>>>> We may need to add some mechanisms or comments, because as far
>>>>>>>>>>>>> as I know,
>>>>>>>>>>>>> the semantics of each database is actually different, which
>>>>>>>>>>>>> may need to be
>>>>>>>>>>>>> reflected in your specific implementation.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For example, the AVG output types of various databases may be
>>>>>>>>>>>>> different.
>>>>>>>>>>>>> For example, MySQL outputs double, this is different from
>>>>>>>>>>>>> Flink. What
>>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and count,
>>>>>>>>>>>>> But we also
>>>>>>>>>>>>> need care about decimal and others)
>>>>>>>>>>>>>
>>>>>>>>>>>>> ## The phase of push-down rule
>>>>>>>>>>>>>
>>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano
>>>>>>>>>>>>> phase, which may
>>>>>>>>>>>>> make the cost calculation very troublesome.
>>>>>>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>>>>>>
>>>>>>>>>>>>> ## About interface
>>>>>>>>>>>>>
>>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>>>>>>> `Aggregate`
>>>>>>>>>>>>> interface, it contains `List<CallExpression>
>>>>>>>>>>>>> aggregateExpressions, int[]
>>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this
>>>>>>>>>>>>> way, we can add
>>>>>>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think the current design is very good, just put forward some
>>>>>>>>>>>>> ideas.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <
>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> > Hi Jark,
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>>>>>>>> TableSource can
>>>>>>>>>>>>> > handle all of the aggregates.
>>>>>>>>>>>>> > It's better to just return a boolean type to indicate
>>>>>>>>>>>>> whether all of
>>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in
>>>>>>>>>>>>> proposal]
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced
>>>>>>>>>>>>> data type of
>>>>>>>>>>>>> > the new table source to make sure that the new table source
>>>>>>>>>>>>> can
>>>>>>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's
>>>>>>>>>>>>> return type.
>>>>>>>>>>>>> > The reason for adding this parameter in this function is
>>>>>>>>>>>>> also to facilitate
>>>>>>>>>>>>> > the user to build the final output type. I have changed this
>>>>>>>>>>>>> parameter
>>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have
>>>>>>>>>>>>> changed to use
>>>>>>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Thx again for the suggestion, looking for the further
>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > Regarding to the updated interface,
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>>>>>>> aggregateExpressions,
>>>>>>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > final class Result {
>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>> acceptedAggregates;
>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>> remainingAggregates;
>>>>>>>>>>>>> > > }
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > I have following comments:
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a
>>>>>>>>>>>>> boolean return type
>>>>>>>>>>>>> > > enough?
>>>>>>>>>>>>> > >     From my understanding, all of the aggregates should be
>>>>>>>>>>>>> accepted,
>>>>>>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>>>>>>> > >     Therefore, users don't need to distinguish which
>>>>>>>>>>>>> aggregates are
>>>>>>>>>>>>> > > "accepted".
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced
>>>>>>>>>>>>> data type of the
>>>>>>>>>>>>> > > new source, or just the return type of all the agg
>>>>>>>>>>>>> functions?
>>>>>>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users
>>>>>>>>>>>>> to concat a
>>>>>>>>>>>>> > final
>>>>>>>>>>>>> > > output type.
>>>>>>>>>>>>> > >     The return type of each agg function can be obtained
>>>>>>>>>>>>> from the
>>>>>>>>>>>>> > > `CallExpression`.
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to
>>>>>>>>>>>>> `grouping` or
>>>>>>>>>>>>> > > `groupedFields` ?
>>>>>>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>>>>>>> "grouping sets".
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > What do you think?
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > Best,
>>>>>>>>>>>>> > > Jark
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>>>>>>> > >> I meant idea #2.
>>>>>>>>>>>>> > >>
>>>>>>>>>>>>> > >> Best,
>>>>>>>>>>>>> > >> Kurt
>>>>>>>>>>>>> > >>
>>>>>>>>>>>>> > >>
>>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>> > >> wrote:
>>>>>>>>>>>>> > >>
>>>>>>>>>>>>> > >>> Hi Kurt,
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is
>>>>>>>>>>>>> more like a
>>>>>>>>>>>>> > >>> physical operator rather than logical
>>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2
>>>>>>>>>>>>> which handle all in
>>>>>>>>>>>>> > >>> the physical optimization phase?
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator
>>>>>>>>>>>>> rather than logical
>>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>> > >>>> Best,
>>>>>>>>>>>>> > >>>> Kurt
>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>> > >>>> wrote:
>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>>>>>>>>> suggestions.
>>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of
>>>>>>>>>>>>> upgrading the new
>>>>>>>>>>>>> > table
>>>>>>>>>>>>> > >>>> > source api,
>>>>>>>>>>>>> > >>>> > we really should consider the new interface for the
>>>>>>>>>>>>> new optimize
>>>>>>>>>>>>> > >>>> rule. If
>>>>>>>>>>>>> > >>>> > the new rule
>>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it
>>>>>>>>>>>>> sooner or later. I
>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>> > >>>> > change to use
>>>>>>>>>>>>> > >>>> > the ability interface for the
>>>>>>>>>>>>> SupportsAggregatePushDown definition
>>>>>>>>>>>>> > in
>>>>>>>>>>>>> > >>>> above
>>>>>>>>>>>>> > >>>> > proposal.
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a
>>>>>>>>>>>>> better choice, and
>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>> > >>>> > resolved this
>>>>>>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC
>>>>>>>>>>>>> connector, as we don't
>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>> > >>>> > Druid connector
>>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we
>>>>>>>>>>>>> should use idea 1
>>>>>>>>>>>>> > >>>> or
>>>>>>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > What do you think?  After we reach the agreement on
>>>>>>>>>>>>> the proposal,
>>>>>>>>>>>>> > our
>>>>>>>>>>>>> > >>>> team
>>>>>>>>>>>>> > >>>> > can drive to
>>>>>>>>>>>>> > >>>> > complete this feature.
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>>>>>>> improvement for
>>>>>>>>>>>>> > >>>> Flink
>>>>>>>>>>>>> > >>>> > SQL.
>>>>>>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>>>>>>> thoughts:
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in
>>>>>>>>>>>>> 1.11 and
>>>>>>>>>>>>> > proposed
>>>>>>>>>>>>> > >>>> a new
>>>>>>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you
>>>>>>>>>>>>> update your
>>>>>>>>>>>>> > >>>> proposal to
>>>>>>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a
>>>>>>>>>>>>> better
>>>>>>>>>>>>> > >>>> representation
>>>>>>>>>>>>> > >>>> > than
>>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it
>>>>>>>>>>>>> would be
>>>>>>>>>>>>> > easier
>>>>>>>>>>>>> > >>>> to
>>>>>>>>>>>>> > >>>> > know
>>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors will
>>>>>>>>>>>>> be supported
>>>>>>>>>>>>> > in
>>>>>>>>>>>>> > >>>> the
>>>>>>>>>>>>> > >>>> > > plan?
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > Best,
>>>>>>>>>>>>> > >>>> > > Jark
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>>>>>>> > [hidden email]>
>>>>>>>>>>>>> > >>>> > wrote:
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > > Hi all,
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink
>>>>>>>>>>>>> Planner.
>>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently
>>>>>>>>>>>>> fully done at Flink
>>>>>>>>>>>>> > >>>> layer.
>>>>>>>>>>>>> > >>>> > > > With the developing of storage, many downstream
>>>>>>>>>>>>> storage of Flink
>>>>>>>>>>>>> > >>>> SQL
>>>>>>>>>>>>> > >>>> > has
>>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will
>>>>>>>>>>>>> improve
>>>>>>>>>>>>> > >>>> performance
>>>>>>>>>>>>> > >>>> > from
>>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and computation
>>>>>>>>>>>>> overhead.
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>> >
>>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > > > --
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese
>>>>>>>>>>>>> Academy of Science
>>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>> > >>>> > > > E-mail: [hidden email] <
>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > --
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > *With kind regards
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>> Science
>>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>> > >>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>> --
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>> *With kind regards
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>> Science
>>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>>> > >>> QQ: 3239559*
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > --
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > *With kind regards
>>>>>>>>>>>>> > ------------------------------------------------------------
>>>>>>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>>> > QQ: 3239559*
>>>>>>>>>>>>> >
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> *With kind regards
>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>> Sebastian Liu 刘洋
>>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>> QQ: 3239559*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best, Jingsong Lee
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *With kind regards
>>>>> ------------------------------------------------------------
>>>>> Sebastian Liu 刘洋
>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>> Mobile\WeChat: +86—15201613655
>>>>> E-mail: [hidden email] <[hidden email]>
>>>>> QQ: 3239559*
>>>>>
>>>>>
>>>
>>> --
>>>
>>> *With kind regards
>>> ------------------------------------------------------------
>>> Sebastian Liu 刘洋
>>> Institute of Computing Technology, Chinese Academy of Science
>>> Mobile\WeChat: +86—15201613655
>>> E-mail: [hidden email] <[hidden email]>
>>> QQ: 3239559*
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

--

*With kind regards
------------------------------------------------------------
Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: [hidden email] <[hidden email]>
QQ: 3239559*
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jark Wu-2
Hi Sebastian,

I assigned the issue to you. But I suggest creating sub-tasks under this
issue. Because I think this would be a big contribution.
For example, you can split it into:
1. Introduce SupportsAggregatePushDown interface
2. Support SupportsAggregatePushDown in planner
3. Support SupportsAggregatePushDown for JDBC source
4. ...

Best,
Jark

On Thu, 7 Jan 2021 at 23:27, Sebastian Liu <[hidden email]> wrote:

> Hi Jark,
>
> Seems that we have reached the agreement on the proposal. Could you
> please help to assign the below jira ticket to me?
> https://issues.apache.org/jira/browse/FLINK-20791
>
> Jark Wu <[hidden email]> 于2021年1月7日周四 上午10:25写道:
>
>> Thanks for updating the design doc.
>> It looks good to me.
>>
>> Best,
>> Jark
>>
>> On Thu, 7 Jan 2021 at 10:16, Jingsong Li <[hidden email]> wrote:
>>
>>> Sounds good to me.
>>>
>>> We don't have to worry about future changes, because it has covered all
>>> the capabilities of calcite aggregation.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <[hidden email]>
>>> wrote:
>>>
>>>> Hi Jark,
>>>>
>>>> Sounds good to me. For better scalability in the future, we could add
>>>> the AggregateExpression.
>>>> ```
>>>>
>>>> public class AggregateExpression implements ResolvedExpression {
>>>>
>>>>    private final FunctionDefinition functionDefinition;
>>>>
>>>>    private final List<FieldReferenceExpression> args;
>>>>
>>>>    private final @Nullable CallExpression filterExpression;
>>>>
>>>>    private final DataType resultType;
>>>>
>>>>    private final boolean distinct;
>>>>
>>>>    private final boolean approximate;
>>>>
>>>>
>>>>
>>>>    private final boolean ignoreNulls;
>>>>
>>>> }
>>>> ```
>>>>
>>>> And we really only need one GroupingSets parameter for grouping. I have
>>>> updated the related interface in the proposal.
>>>> Appreciate the continued feedback and help.
>>>>
>>>> Jark Wu <[hidden email]> 于2021年1月6日周三 下午9:34写道:
>>>>
>>>>> Hi Liu, Jingsong,
>>>>>
>>>>> Regarding the agg with filter, I think in theory we can support
>>>>> pushing such a pattern into source.
>>>>> We don't need to support it in the first version, but in the long
>>>>> term, we can support it.
>>>>> The designed interface should be future proof.
>>>>>
>>>>> Considering filter arg and distinct flag should be part of the
>>>>> aggregate expression.
>>>>> I'm wondering if CallExpression is a good representation for it.
>>>>> What do you think about proposing the following `AggregateExpression`
>>>>> to replace the `CallExpression`?
>>>>>
>>>>> class AggregateExpression implements ResolvedExpression {
>>>>>     private final FunctionDefinition functionDefinition;
>>>>>     private final List<FieldReferenceExpression> args;
>>>>>     private final @Nullable CallExpression filterExpr;
>>>>>     private final boolean distinct;
>>>>> }
>>>>>
>>>>> Besides, we don't need both groupingFields and groupingSets.
>>>>> `groupingSets` should be a superset of groupingFields.
>>>>> Then the interface of SupportsAggregatePushDown can be:
>>>>>
>>>>> interface SupportsAggregatePushDown {
>>>>>
>>>>>   boolean applyAggregates(
>>>>>     List<int[]> groupingSets,
>>>>>     List<AggregateExpression> aggregates,
>>>>>     DataType producedDataType);
>>>>> }
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi Jingsong, Jark,
>>>>>>
>>>>>> Thx so much for our discussion, and the cases mentioned above are
>>>>>> really worthy for further discussion.
>>>>>>
>>>>>> 1. For aggregate with filter expressions: eg: select COUNT(1)
>>>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center;
>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> LocalAgg
>>>>>> -> Exchange -> FinalAgg.
>>>>>> As there is a Calc above the TableSource, this pattern can't match
>>>>>> the LocalAggPushDownRule in the current design.
>>>>>>
>>>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from
>>>>>> call_center group by rollup(cc_class, cc_employees);
>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> Calc.
>>>>>> It's also not covered by the current LocalAggPushDownRule design.
>>>>>>
>>>>>> 3. I want to add a case which we haven't discussed yet.
>>>>>> Aggregate with Having clause.
>>>>>> eg: select COUNT(1) from call_center group by cc_class having
>>>>>> max(cc_tax_percentage) > 0.2;
>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg ->
>>>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]).
>>>>>>
>>>>>> The core discussion points are summarized as follows:
>>>>>> a) Aggregate is a more complex scenario than predicates or limits,
>>>>>> and also depends on the different underlying storages.
>>>>>> b) One rule seems can't completely cover all aggregate scenario, but
>>>>>> whether SupportSAggregatePushDown interface can be a bit more general?
>>>>>> c) Could the CallExpression express the semantics of CalCite
>>>>>> AggregateCall?
>>>>>>
>>>>>> IMO: Completely push down aggregate is generally hard for distributed
>>>>>> systems. Usually we need a GROUP BY and exactly
>>>>>> matches the partition mode in downstream storage. At the same time,
>>>>>> the benefit of remove the final aggregate is actually limited.
>>>>>> The LocalAggPushDown generally yields more than 80% of the CPU and IO
>>>>>> benefits. But I also agree that
>>>>>> the SupportsAggregatePushDown interface should be as generic as
>>>>>> possible for future extensions, and meanwhile keep confidence
>>>>>> in the interface we design.
>>>>>>
>>>>>> For core points (a): As the complexity of aggregate, one
>>>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / Exchange
>>>>>> / FinalXXAgg"
>>>>>> in physical phase. Seems that we can't solve all cases with only one
>>>>>> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
>>>>>> on the pattern of TableSourceScan + LocalXXAggregate at present.
>>>>>>
>>>>>> For core points (b & c): I think we can change the interface to be:
>>>>>> ```
>>>>>>
>>>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>>> aggregateExpressions, DataType producedDataType, List<int[]>
>>>>>> groupingSets);
>>>>>>
>>>>>> ```
>>>>>>
>>>>>>
>>>>>> Simple Group: groupingSets.size() == 1 &&
>>>>>> groupingSets.get(0).equals(groupingFields)
>>>>>>
>>>>>> Cube Group: groupingSets.size() == IntMath.pow(2,
>>>>>> groupingFields.cardinality())
>>>>>> Rollup: Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup
>>>>>>
>>>>>> Then we can handle the complex grouping case. The Connector developer
>>>>>> of the downstream storage should determine
>>>>>> whether it supports the associated grouping type. For the filter and
>>>>>> having clause, they will convert to be related Calc RelNode,
>>>>>> and no longer in the LocalAggregate node, the CallExpression may be
>>>>>> sufficient to express the semantics of AggregateCall.
>>>>>>
>>>>>> What do you think? Looking forward to our further discussion.
>>>>>>
>>>>>>
>>>>>> Jingsong Li <[hidden email]> 于2021年1月6日周三 下午2:24写道:
>>>>>>
>>>>>>> > I think filter expressions and grouping sets are semantic
>>>>>>> arguments instead of utilities. If we want to push them into sources, the
>>>>>>> connector developers should be aware of them.Wrapping them in a context
>>>>>>> implicitly is error-prone that the existing connector will produce wrong
>>>>>>> results when upgrading to new Flink versions.
>>>>>>>
>>>>>>> We can have some mechanism to check the upgrading.
>>>>>>>
>>>>>>> > I think for these cases, providing a new default method to
>>>>>>> override might be a better choice.
>>>>>>>
>>>>>>> Then we will have three or more methods. For the API level, I really
>>>>>>> don't like it...
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong
>>>>>>>
>>>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <[hidden email]> wrote:
>>>>>>>
>>>>>>>> I think filter expressions and grouping sets are semantic arguments
>>>>>>>> instead of utilities.
>>>>>>>> If we want to push them into sources, the connector developers
>>>>>>>> should be aware of them.
>>>>>>>> Wrapping them in a context implicitly is error-prone that the
>>>>>>>> existing connector will produce wrong results
>>>>>>>>  when upgrading to new Flink versions (as we are pushing
>>>>>>>> grouping_sets/filter_args, but connector ignores it).
>>>>>>>> I think for these cases, providing a new default method to override
>>>>>>>> might be a better choice.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>>
>>>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <[hidden email]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm also curious about aggregate with filter (COUNT(1)
>>>>>>>>> FILTER(WHERE d > 1)). Can we push it down? I'm not sure that a single call
>>>>>>>>> expression can express it, and how we should embody it and convey it to
>>>>>>>>> users.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jingsong
>>>>>>>>>
>>>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <[hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Jark,
>>>>>>>>>>
>>>>>>>>>> I don't want to limit this interface to LocalAgg Push down.
>>>>>>>>>> Actually, sometimes, we can push whole aggregation to source too.
>>>>>>>>>>
>>>>>>>>>> So, this rule can do something more advanced. For example, we can
>>>>>>>>>> push down group sets to source too, for the SQL: "GROUP BY GROUPING SETS
>>>>>>>>>> (f1, f2)". Then, we need to add more information to push down.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jingsong
>>>>>>>>>>
>>>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>>> I think this may be over designed. We should have confidence in
>>>>>>>>>>> the interface we design, the interface should be stable.
>>>>>>>>>>> Wrapping things in a big context has a cost of losing user
>>>>>>>>>>> convenience.
>>>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do
>>>>>>>>>>> you know any potential parameters?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jark
>>>>>>>>>>>
>>>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <[hidden email]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Sebastian,
>>>>>>>>>>>>
>>>>>>>>>>>> Well, I mean:
>>>>>>>>>>>>
>>>>>>>>>>>> `boolean applyAggregates(int[] groupingFields,
>>>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType producedDataType);`
>>>>>>>>>>>> VS
>>>>>>>>>>>> ```
>>>>>>>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>>>>>>>
>>>>>>>>>>>> interface Aggregation {
>>>>>>>>>>>>   int[] groupingFields();
>>>>>>>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>>>>>>>   DataType producedDataType();
>>>>>>>>>>>> }
>>>>>>>>>>>> ```
>>>>>>>>>>>>
>>>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the future, so
>>>>>>>>>>>> make the parameters interface, which is conducive to the future expansion
>>>>>>>>>>>> without destroying the compatibility of user implementation. If it is the
>>>>>>>>>>>> way before, users need to modify the code.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <
>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Jinsong,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thx a lot for your suggestion. These points really need to be
>>>>>>>>>>>>> clear in the proposal.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the semantic problem, I think the main point is the
>>>>>>>>>>>>> different returned data types
>>>>>>>>>>>>> for the target aggregate function and the row format returned
>>>>>>>>>>>>> by the underlying storage.
>>>>>>>>>>>>> That's why we provide the producedDataType in the
>>>>>>>>>>>>> SupportsAggregatePushDown interface.
>>>>>>>>>>>>> Need to let developers know that we need to handle the
>>>>>>>>>>>>> semantic differences between
>>>>>>>>>>>>> the underlying storage system and Flink in related connectors.
>>>>>>>>>>>>> [Supplemented in proposal]
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule
>>>>>>>>>>>>> rule, it's also a key point.
>>>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE
>>>>>>>>>>>>> rule set, and better to put it
>>>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the scalability of the interface, actually I don't exactly
>>>>>>>>>>>>> understand your suggestion. Is it to add
>>>>>>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>>>>>>>>> interface, and holds the
>>>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[]
>>>>>>>>>>>>> GroupingFields, DataType producedDataType`
>>>>>>>>>>>>> fields?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful
>>>>>>>>>>>>>> discussion has solved
>>>>>>>>>>>>>> many of my concerns.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ## Semantic problems
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We may need to add some mechanisms or comments, because as
>>>>>>>>>>>>>> far as I know,
>>>>>>>>>>>>>> the semantics of each database is actually different, which
>>>>>>>>>>>>>> may need to be
>>>>>>>>>>>>>> reflected in your specific implementation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For example, the AVG output types of various databases may be
>>>>>>>>>>>>>> different.
>>>>>>>>>>>>>> For example, MySQL outputs double, this is different from
>>>>>>>>>>>>>> Flink. What
>>>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and
>>>>>>>>>>>>>> count, But we also
>>>>>>>>>>>>>> need care about decimal and others)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ## The phase of push-down rule
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano
>>>>>>>>>>>>>> phase, which may
>>>>>>>>>>>>>> make the cost calculation very troublesome.
>>>>>>>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ## About interface
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>>>>>>>> `Aggregate`
>>>>>>>>>>>>>> interface, it contains `List<CallExpression>
>>>>>>>>>>>>>> aggregateExpressions, int[]
>>>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this
>>>>>>>>>>>>>> way, we can add
>>>>>>>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think the current design is very good, just put forward
>>>>>>>>>>>>>> some ideas.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <
>>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > Hi Jark,
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>>>>>>>>> TableSource can
>>>>>>>>>>>>>> > handle all of the aggregates.
>>>>>>>>>>>>>> > It's better to just return a boolean type to indicate
>>>>>>>>>>>>>> whether all of
>>>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in
>>>>>>>>>>>>>> proposal]
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced
>>>>>>>>>>>>>> data type of
>>>>>>>>>>>>>> > the new table source to make sure that the new table source
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's
>>>>>>>>>>>>>> return type.
>>>>>>>>>>>>>> > The reason for adding this parameter in this function is
>>>>>>>>>>>>>> also to facilitate
>>>>>>>>>>>>>> > the user to build the final output type. I have changed
>>>>>>>>>>>>>> this parameter
>>>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have
>>>>>>>>>>>>>> changed to use
>>>>>>>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Thx again for the suggestion, looking for the further
>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > Regarding to the updated interface,
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>>>>>>>> aggregateExpressions,
>>>>>>>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > final class Result {
>>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>>> acceptedAggregates;
>>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>>> remainingAggregates;
>>>>>>>>>>>>>> > > }
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > I have following comments:
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a
>>>>>>>>>>>>>> boolean return type
>>>>>>>>>>>>>> > > enough?
>>>>>>>>>>>>>> > >     From my understanding, all of the aggregates should
>>>>>>>>>>>>>> be accepted,
>>>>>>>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>>>>>>>> > >     Therefore, users don't need to distinguish which
>>>>>>>>>>>>>> aggregates are
>>>>>>>>>>>>>> > > "accepted".
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced
>>>>>>>>>>>>>> data type of the
>>>>>>>>>>>>>> > > new source, or just the return type of all the agg
>>>>>>>>>>>>>> functions?
>>>>>>>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users
>>>>>>>>>>>>>> to concat a
>>>>>>>>>>>>>> > final
>>>>>>>>>>>>>> > > output type.
>>>>>>>>>>>>>> > >     The return type of each agg function can be obtained
>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>> > > `CallExpression`.
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to
>>>>>>>>>>>>>> `grouping` or
>>>>>>>>>>>>>> > > `groupedFields` ?
>>>>>>>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>>>>>>>> "grouping sets".
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > What do you think?
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > Best,
>>>>>>>>>>>>>> > > Jark
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <[hidden email]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>>>>>>>> > >> I meant idea #2.
>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>> > >> Best,
>>>>>>>>>>>>>> > >> Kurt
>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>> > >> wrote:
>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>> > >>> Hi Kurt,
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is
>>>>>>>>>>>>>> more like a
>>>>>>>>>>>>>> > >>> physical operator rather than logical
>>>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2
>>>>>>>>>>>>>> which handle all in
>>>>>>>>>>>>>> > >>> the physical optimization phase?
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator
>>>>>>>>>>>>>> rather than logical
>>>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>> > >>>> Best,
>>>>>>>>>>>>>> > >>>> Kurt
>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>> > >>>> wrote:
>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>>>>>>>>>> suggestions.
>>>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of
>>>>>>>>>>>>>> upgrading the new
>>>>>>>>>>>>>> > table
>>>>>>>>>>>>>> > >>>> > source api,
>>>>>>>>>>>>>> > >>>> > we really should consider the new interface for the
>>>>>>>>>>>>>> new optimize
>>>>>>>>>>>>>> > >>>> rule. If
>>>>>>>>>>>>>> > >>>> > the new rule
>>>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it
>>>>>>>>>>>>>> sooner or later. I
>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>> > >>>> > change to use
>>>>>>>>>>>>>> > >>>> > the ability interface for the
>>>>>>>>>>>>>> SupportsAggregatePushDown definition
>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>> > >>>> above
>>>>>>>>>>>>>> > >>>> > proposal.
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a
>>>>>>>>>>>>>> better choice, and
>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>> > >>>> > resolved this
>>>>>>>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC
>>>>>>>>>>>>>> connector, as we don't
>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>> > >>>> > Druid connector
>>>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we
>>>>>>>>>>>>>> should use idea 1
>>>>>>>>>>>>>> > >>>> or
>>>>>>>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > What do you think?  After we reach the agreement on
>>>>>>>>>>>>>> the proposal,
>>>>>>>>>>>>>> > our
>>>>>>>>>>>>>> > >>>> team
>>>>>>>>>>>>>> > >>>> > can drive to
>>>>>>>>>>>>>> > >>>> > complete this feature.
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>>>>>>>> improvement for
>>>>>>>>>>>>>> > >>>> Flink
>>>>>>>>>>>>>> > >>>> > SQL.
>>>>>>>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>>>>>>>> thoughts:
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in
>>>>>>>>>>>>>> 1.11 and
>>>>>>>>>>>>>> > proposed
>>>>>>>>>>>>>> > >>>> a new
>>>>>>>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you
>>>>>>>>>>>>>> update your
>>>>>>>>>>>>>> > >>>> proposal to
>>>>>>>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a
>>>>>>>>>>>>>> better
>>>>>>>>>>>>>> > >>>> representation
>>>>>>>>>>>>>> > >>>> > than
>>>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because,
>>>>>>>>>>>>>> it would be
>>>>>>>>>>>>>> > easier
>>>>>>>>>>>>>> > >>>> to
>>>>>>>>>>>>>> > >>>> > know
>>>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors
>>>>>>>>>>>>>> will be supported
>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>> > >>>> the
>>>>>>>>>>>>>> > >>>> > > plan?
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > Best,
>>>>>>>>>>>>>> > >>>> > > Jark
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>>>>>>>> > [hidden email]>
>>>>>>>>>>>>>> > >>>> > wrote:
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > > Hi all,
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink
>>>>>>>>>>>>>> Planner.
>>>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently
>>>>>>>>>>>>>> fully done at Flink
>>>>>>>>>>>>>> > >>>> layer.
>>>>>>>>>>>>>> > >>>> > > > With the developing of storage, many downstream
>>>>>>>>>>>>>> storage of Flink
>>>>>>>>>>>>>> > >>>> SQL
>>>>>>>>>>>>>> > >>>> > has
>>>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will
>>>>>>>>>>>>>> improve
>>>>>>>>>>>>>> > >>>> performance
>>>>>>>>>>>>>> > >>>> > from
>>>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and
>>>>>>>>>>>>>> computation overhead.
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > > > --
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese
>>>>>>>>>>>>>> Academy of Science
>>>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>> > >>>> > > > E-mail: [hidden email] <
>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > --
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > *With kind regards
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy
>>>>>>>>>>>>>> of Science
>>>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>> > >>>> > E-mail: [hidden email] <[hidden email]
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>> --
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>> *With kind regards
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>>> Science
>>>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>>>> > >>> QQ: 3239559*
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > --
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > *With kind regards
>>>>>>>>>>>>>> > ------------------------------------------------------------
>>>>>>>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>>> Science
>>>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>>>> > QQ: 3239559*
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>>
>>>>>>>>>>>>> *With kind regards
>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>> Sebastian Liu 刘洋
>>>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>>> QQ: 3239559*
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *With kind regards
>>>>>> ------------------------------------------------------------
>>>>>> Sebastian Liu 刘洋
>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>> Mobile\WeChat: +86—15201613655
>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>> QQ: 3239559*
>>>>>>
>>>>>>
>>>>
>>>> --
>>>>
>>>> *With kind regards
>>>> ------------------------------------------------------------
>>>> Sebastian Liu 刘洋
>>>> Institute of Computing Technology, Chinese Academy of Science
>>>> Mobile\WeChat: +86—15201613655
>>>> E-mail: [hidden email] <[hidden email]>
>>>> QQ: 3239559*
>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
>
> *With kind regards
> ------------------------------------------------------------
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: [hidden email] <[hidden email]>
> QQ: 3239559*
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Sebastian Liu
Hi Jark,

Cool, following your suggestions I have created three related subtasks
under Flink-20791.
Hope to assign these subtasks to me too, when you have time. And I
will push forward the relevant implementation.

Jark Wu <[hidden email]> 于2021年1月8日周五 下午12:30写道:

> Hi Sebastian,
>
> I assigned the issue to you. But I suggest creating sub-tasks under this
> issue. Because I think this would be a big contribution.
> For example, you can split it into:
> 1. Introduce SupportsAggregatePushDown interface
> 2. Support SupportsAggregatePushDown in planner
> 3. Support SupportsAggregatePushDown for JDBC source
> 4. ...
>
> Best,
> Jark
>
> On Thu, 7 Jan 2021 at 23:27, Sebastian Liu <[hidden email]> wrote:
>
>> Hi Jark,
>>
>> Seems that we have reached the agreement on the proposal. Could you
>> please help to assign the below jira ticket to me?
>> https://issues.apache.org/jira/browse/FLINK-20791
>>
>> Jark Wu <[hidden email]> 于2021年1月7日周四 上午10:25写道:
>>
>>> Thanks for updating the design doc.
>>> It looks good to me.
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 7 Jan 2021 at 10:16, Jingsong Li <[hidden email]> wrote:
>>>
>>>> Sounds good to me.
>>>>
>>>> We don't have to worry about future changes, because it has covered all
>>>> the capabilities of calcite aggregation.
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi Jark,
>>>>>
>>>>> Sounds good to me. For better scalability in the future, we could add
>>>>> the AggregateExpression.
>>>>> ```
>>>>>
>>>>> public class AggregateExpression implements ResolvedExpression {
>>>>>
>>>>>    private final FunctionDefinition functionDefinition;
>>>>>
>>>>>    private final List<FieldReferenceExpression> args;
>>>>>
>>>>>    private final @Nullable CallExpression filterExpression;
>>>>>
>>>>>    private final DataType resultType;
>>>>>
>>>>>    private final boolean distinct;
>>>>>
>>>>>    private final boolean approximate;
>>>>>
>>>>>
>>>>>
>>>>>    private final boolean ignoreNulls;
>>>>>
>>>>> }
>>>>> ```
>>>>>
>>>>> And we really only need one GroupingSets parameter for grouping. I
>>>>> have updated the related interface in the proposal.
>>>>> Appreciate the continued feedback and help.
>>>>>
>>>>> Jark Wu <[hidden email]> 于2021年1月6日周三 下午9:34写道:
>>>>>
>>>>>> Hi Liu, Jingsong,
>>>>>>
>>>>>> Regarding the agg with filter, I think in theory we can support
>>>>>> pushing such a pattern into source.
>>>>>> We don't need to support it in the first version, but in the long
>>>>>> term, we can support it.
>>>>>> The designed interface should be future proof.
>>>>>>
>>>>>> Considering filter arg and distinct flag should be part of the
>>>>>> aggregate expression.
>>>>>> I'm wondering if CallExpression is a good representation for it.
>>>>>> What do you think about proposing the following `AggregateExpression`
>>>>>> to replace the `CallExpression`?
>>>>>>
>>>>>> class AggregateExpression implements ResolvedExpression {
>>>>>>     private final FunctionDefinition functionDefinition;
>>>>>>     private final List<FieldReferenceExpression> args;
>>>>>>     private final @Nullable CallExpression filterExpr;
>>>>>>     private final boolean distinct;
>>>>>> }
>>>>>>
>>>>>> Besides, we don't need both groupingFields and groupingSets.
>>>>>> `groupingSets` should be a superset of groupingFields.
>>>>>> Then the interface of SupportsAggregatePushDown can be:
>>>>>>
>>>>>> interface SupportsAggregatePushDown {
>>>>>>
>>>>>>   boolean applyAggregates(
>>>>>>     List<int[]> groupingSets,
>>>>>>     List<AggregateExpression> aggregates,
>>>>>>     DataType producedDataType);
>>>>>> }
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Jingsong, Jark,
>>>>>>>
>>>>>>> Thx so much for our discussion, and the cases mentioned above are
>>>>>>> really worthy for further discussion.
>>>>>>>
>>>>>>> 1. For aggregate with filter expressions: eg: select COUNT(1)
>>>>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center;
>>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) ->
>>>>>>> LocalAgg -> Exchange -> FinalAgg.
>>>>>>> As there is a Calc above the TableSource, this pattern can't match
>>>>>>> the LocalAggPushDownRule in the current design.
>>>>>>>
>>>>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from
>>>>>>> call_center group by rollup(cc_class, cc_employees);
>>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg ->
>>>>>>> Calc.
>>>>>>> It's also not covered by the current LocalAggPushDownRule design.
>>>>>>>
>>>>>>> 3. I want to add a case which we haven't discussed yet.
>>>>>>> Aggregate with Having clause.
>>>>>>> eg: select COUNT(1) from call_center group by cc_class having
>>>>>>> max(cc_tax_percentage) > 0.2;
>>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg ->
>>>>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]).
>>>>>>>
>>>>>>> The core discussion points are summarized as follows:
>>>>>>> a) Aggregate is a more complex scenario than predicates or limits,
>>>>>>> and also depends on the different underlying storages.
>>>>>>> b) One rule seems can't completely cover all aggregate scenario, but
>>>>>>> whether SupportSAggregatePushDown interface can be a bit more general?
>>>>>>> c) Could the CallExpression express the semantics of CalCite
>>>>>>> AggregateCall?
>>>>>>>
>>>>>>> IMO: Completely push down aggregate is generally hard for
>>>>>>> distributed systems. Usually we need a GROUP BY and exactly
>>>>>>> matches the partition mode in downstream storage. At the same time,
>>>>>>> the benefit of remove the final aggregate is actually limited.
>>>>>>> The LocalAggPushDown generally yields more than 80% of the CPU and
>>>>>>> IO benefits. But I also agree that
>>>>>>> the SupportsAggregatePushDown interface should be as generic as
>>>>>>> possible for future extensions, and meanwhile keep confidence
>>>>>>> in the interface we design.
>>>>>>>
>>>>>>> For core points (a): As the complexity of aggregate, one
>>>>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / Exchange
>>>>>>> / FinalXXAgg"
>>>>>>> in physical phase. Seems that we can't solve all cases with only one
>>>>>>> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
>>>>>>> on the pattern of TableSourceScan + LocalXXAggregate at present.
>>>>>>>
>>>>>>> For core points (b & c): I think we can change the interface to be:
>>>>>>> ```
>>>>>>>
>>>>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>>>> aggregateExpressions, DataType producedDataType, List<int[]>
>>>>>>> groupingSets);
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>>
>>>>>>> Simple Group: groupingSets.size() == 1 &&
>>>>>>> groupingSets.get(0).equals(groupingFields)
>>>>>>>
>>>>>>> Cube Group: groupingSets.size() == IntMath.pow(2,
>>>>>>> groupingFields.cardinality())
>>>>>>> Rollup:
>>>>>>> Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup
>>>>>>>
>>>>>>> Then we can handle the complex grouping case. The Connector
>>>>>>> developer of the downstream storage should determine
>>>>>>> whether it supports the associated grouping type. For the filter and
>>>>>>> having clause, they will convert to be related Calc RelNode,
>>>>>>> and no longer in the LocalAggregate node, the CallExpression may be
>>>>>>> sufficient to express the semantics of AggregateCall.
>>>>>>>
>>>>>>> What do you think? Looking forward to our further discussion.
>>>>>>>
>>>>>>>
>>>>>>> Jingsong Li <[hidden email]> 于2021年1月6日周三 下午2:24写道:
>>>>>>>
>>>>>>>> > I think filter expressions and grouping sets are semantic
>>>>>>>> arguments instead of utilities. If we want to push them into sources, the
>>>>>>>> connector developers should be aware of them.Wrapping them in a context
>>>>>>>> implicitly is error-prone that the existing connector will produce wrong
>>>>>>>> results when upgrading to new Flink versions.
>>>>>>>>
>>>>>>>> We can have some mechanism to check the upgrading.
>>>>>>>>
>>>>>>>> > I think for these cases, providing a new default method to
>>>>>>>> override might be a better choice.
>>>>>>>>
>>>>>>>> Then we will have three or more methods. For the API level, I
>>>>>>>> really don't like it...
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jingsong
>>>>>>>>
>>>>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <[hidden email]> wrote:
>>>>>>>>
>>>>>>>>> I think filter expressions and grouping sets are semantic
>>>>>>>>> arguments instead of utilities.
>>>>>>>>> If we want to push them into sources, the connector developers
>>>>>>>>> should be aware of them.
>>>>>>>>> Wrapping them in a context implicitly is error-prone that the
>>>>>>>>> existing connector will produce wrong results
>>>>>>>>>  when upgrading to new Flink versions (as we are pushing
>>>>>>>>> grouping_sets/filter_args, but connector ignores it).
>>>>>>>>> I think for these cases, providing a new default method to
>>>>>>>>> override might be a better choice.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>>
>>>>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <[hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'm also curious about aggregate with filter (COUNT(1)
>>>>>>>>>> FILTER(WHERE d > 1)). Can we push it down? I'm not sure that a single call
>>>>>>>>>> expression can express it, and how we should embody it and convey it to
>>>>>>>>>> users.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jingsong
>>>>>>>>>>
>>>>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <
>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Jark,
>>>>>>>>>>>
>>>>>>>>>>> I don't want to limit this interface to LocalAgg Push down.
>>>>>>>>>>> Actually, sometimes, we can push whole aggregation to source too.
>>>>>>>>>>>
>>>>>>>>>>> So, this rule can do something more advanced. For example, we
>>>>>>>>>>> can push down group sets to source too, for the SQL: "GROUP BY GROUPING
>>>>>>>>>>> SETS (f1, f2)". Then, we need to add more information to push down.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jingsong
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I think this may be over designed. We should have confidence in
>>>>>>>>>>>> the interface we design, the interface should be stable.
>>>>>>>>>>>> Wrapping things in a big context has a cost of losing user
>>>>>>>>>>>> convenience.
>>>>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do
>>>>>>>>>>>> you know any potential parameters?
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jark
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <
>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Sebastian,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Well, I mean:
>>>>>>>>>>>>>
>>>>>>>>>>>>> `boolean applyAggregates(int[] groupingFields,
>>>>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType producedDataType);`
>>>>>>>>>>>>> VS
>>>>>>>>>>>>> ```
>>>>>>>>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>>>>>>>>
>>>>>>>>>>>>> interface Aggregation {
>>>>>>>>>>>>>   int[] groupingFields();
>>>>>>>>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>>>>>>>>   DataType producedDataType();
>>>>>>>>>>>>> }
>>>>>>>>>>>>> ```
>>>>>>>>>>>>>
>>>>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the future, so
>>>>>>>>>>>>> make the parameters interface, which is conducive to the future expansion
>>>>>>>>>>>>> without destroying the compatibility of user implementation. If it is the
>>>>>>>>>>>>> way before, users need to modify the code.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <
>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Jinsong,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thx a lot for your suggestion. These points really need to be
>>>>>>>>>>>>>> clear in the proposal.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For the semantic problem, I think the main point is the
>>>>>>>>>>>>>> different returned data types
>>>>>>>>>>>>>> for the target aggregate function and the row format returned
>>>>>>>>>>>>>> by the underlying storage.
>>>>>>>>>>>>>> That's why we provide the producedDataType in the
>>>>>>>>>>>>>> SupportsAggregatePushDown interface.
>>>>>>>>>>>>>> Need to let developers know that we need to handle the
>>>>>>>>>>>>>> semantic differences between
>>>>>>>>>>>>>> the underlying storage system and Flink in related
>>>>>>>>>>>>>> connectors. [Supplemented in proposal]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule
>>>>>>>>>>>>>> rule, it's also a key point.
>>>>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE
>>>>>>>>>>>>>> rule set, and better to put it
>>>>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For the scalability of the interface, actually I don't
>>>>>>>>>>>>>> exactly understand your suggestion. Is it to add
>>>>>>>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>>>>>>>>>> interface, and holds the
>>>>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[]
>>>>>>>>>>>>>> GroupingFields, DataType producedDataType`
>>>>>>>>>>>>>> fields?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful
>>>>>>>>>>>>>>> discussion has solved
>>>>>>>>>>>>>>> many of my concerns.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ## Semantic problems
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We may need to add some mechanisms or comments, because as
>>>>>>>>>>>>>>> far as I know,
>>>>>>>>>>>>>>> the semantics of each database is actually different, which
>>>>>>>>>>>>>>> may need to be
>>>>>>>>>>>>>>> reflected in your specific implementation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For example, the AVG output types of various databases may
>>>>>>>>>>>>>>> be different.
>>>>>>>>>>>>>>> For example, MySQL outputs double, this is different from
>>>>>>>>>>>>>>> Flink. What
>>>>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and
>>>>>>>>>>>>>>> count, But we also
>>>>>>>>>>>>>>> need care about decimal and others)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ## The phase of push-down rule
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano
>>>>>>>>>>>>>>> phase, which may
>>>>>>>>>>>>>>> make the cost calculation very troublesome.
>>>>>>>>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ## About interface
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>>>>>>>>> `Aggregate`
>>>>>>>>>>>>>>> interface, it contains `List<CallExpression>
>>>>>>>>>>>>>>> aggregateExpressions, int[]
>>>>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this
>>>>>>>>>>>>>>> way, we can add
>>>>>>>>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think the current design is very good, just put forward
>>>>>>>>>>>>>>> some ideas.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <
>>>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > Hi Jark,
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>>>>>>>>>> TableSource can
>>>>>>>>>>>>>>> > handle all of the aggregates.
>>>>>>>>>>>>>>> > It's better to just return a boolean type to indicate
>>>>>>>>>>>>>>> whether all of
>>>>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in
>>>>>>>>>>>>>>> proposal]
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the
>>>>>>>>>>>>>>> produced data type of
>>>>>>>>>>>>>>> > the new table source to make sure that the new table
>>>>>>>>>>>>>>> source can
>>>>>>>>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's
>>>>>>>>>>>>>>> return type.
>>>>>>>>>>>>>>> > The reason for adding this parameter in this function is
>>>>>>>>>>>>>>> also to facilitate
>>>>>>>>>>>>>>> > the user to build the final output type. I have changed
>>>>>>>>>>>>>>> this parameter
>>>>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have
>>>>>>>>>>>>>>> changed to use
>>>>>>>>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Thx again for the suggestion, looking for the further
>>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > Regarding to the updated interface,
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>>>>>>>>> aggregateExpressions,
>>>>>>>>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > final class Result {
>>>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>>>> acceptedAggregates;
>>>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>>>> remainingAggregates;
>>>>>>>>>>>>>>> > > }
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > I have following comments:
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a
>>>>>>>>>>>>>>> boolean return type
>>>>>>>>>>>>>>> > > enough?
>>>>>>>>>>>>>>> > >     From my understanding, all of the aggregates should
>>>>>>>>>>>>>>> be accepted,
>>>>>>>>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>>>>>>>>> > >     Therefore, users don't need to distinguish which
>>>>>>>>>>>>>>> aggregates are
>>>>>>>>>>>>>>> > > "accepted".
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced
>>>>>>>>>>>>>>> data type of the
>>>>>>>>>>>>>>> > > new source, or just the return type of all the agg
>>>>>>>>>>>>>>> functions?
>>>>>>>>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users
>>>>>>>>>>>>>>> to concat a
>>>>>>>>>>>>>>> > final
>>>>>>>>>>>>>>> > > output type.
>>>>>>>>>>>>>>> > >     The return type of each agg function can be obtained
>>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>> > > `CallExpression`.
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to
>>>>>>>>>>>>>>> `grouping` or
>>>>>>>>>>>>>>> > > `groupedFields` ?
>>>>>>>>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>>>>>>>>> "grouping sets".
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > What do you think?
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > Best,
>>>>>>>>>>>>>>> > > Jark
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <
>>>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>>>>>>>>> > >> I meant idea #2.
>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>> > >> Best,
>>>>>>>>>>>>>>> > >> Kurt
>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>> > >> wrote:
>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>> > >>> Hi Kurt,
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is
>>>>>>>>>>>>>>> more like a
>>>>>>>>>>>>>>> > >>> physical operator rather than logical
>>>>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2
>>>>>>>>>>>>>>> which handle all in
>>>>>>>>>>>>>>> > >>> the physical optimization phase?
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator
>>>>>>>>>>>>>>> rather than logical
>>>>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>> > >>>> Best,
>>>>>>>>>>>>>>> > >>>> Kurt
>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>> > >>>> wrote:
>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and
>>>>>>>>>>>>>>> valuable suggestions.
>>>>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of
>>>>>>>>>>>>>>> upgrading the new
>>>>>>>>>>>>>>> > table
>>>>>>>>>>>>>>> > >>>> > source api,
>>>>>>>>>>>>>>> > >>>> > we really should consider the new interface for the
>>>>>>>>>>>>>>> new optimize
>>>>>>>>>>>>>>> > >>>> rule. If
>>>>>>>>>>>>>>> > >>>> > the new rule
>>>>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it
>>>>>>>>>>>>>>> sooner or later. I
>>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>>> > >>>> > change to use
>>>>>>>>>>>>>>> > >>>> > the ability interface for the
>>>>>>>>>>>>>>> SupportsAggregatePushDown definition
>>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>>> > >>>> above
>>>>>>>>>>>>>>> > >>>> > proposal.
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a
>>>>>>>>>>>>>>> better choice, and
>>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>>> > >>>> > resolved this
>>>>>>>>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC
>>>>>>>>>>>>>>> connector, as we don't
>>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>>> > >>>> > Druid connector
>>>>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we
>>>>>>>>>>>>>>> should use idea 1
>>>>>>>>>>>>>>> > >>>> or
>>>>>>>>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > What do you think?  After we reach the agreement on
>>>>>>>>>>>>>>> the proposal,
>>>>>>>>>>>>>>> > our
>>>>>>>>>>>>>>> > >>>> team
>>>>>>>>>>>>>>> > >>>> > can drive to
>>>>>>>>>>>>>>> > >>>> > complete this feature.
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二 下午2:58写道:
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>>>>>>>>> improvement for
>>>>>>>>>>>>>>> > >>>> Flink
>>>>>>>>>>>>>>> > >>>> > SQL.
>>>>>>>>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>>>>>>>>> thoughts:
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in
>>>>>>>>>>>>>>> 1.11 and
>>>>>>>>>>>>>>> > proposed
>>>>>>>>>>>>>>> > >>>> a new
>>>>>>>>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you
>>>>>>>>>>>>>>> update your
>>>>>>>>>>>>>>> > >>>> proposal to
>>>>>>>>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown,
>>>>>>>>>>>>>>> SupportsProjectionPushDown.
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a
>>>>>>>>>>>>>>> better
>>>>>>>>>>>>>>> > >>>> representation
>>>>>>>>>>>>>>> > >>>> > than
>>>>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because,
>>>>>>>>>>>>>>> it would be
>>>>>>>>>>>>>>> > easier
>>>>>>>>>>>>>>> > >>>> to
>>>>>>>>>>>>>>> > >>>> > know
>>>>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors
>>>>>>>>>>>>>>> will be supported
>>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>>> > >>>> the
>>>>>>>>>>>>>>> > >>>> > > plan?
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > Best,
>>>>>>>>>>>>>>> > >>>> > > Jark
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>>>>>>>>> > [hidden email]>
>>>>>>>>>>>>>>> > >>>> > wrote:
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > > Hi all,
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink
>>>>>>>>>>>>>>> Planner.
>>>>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently
>>>>>>>>>>>>>>> fully done at Flink
>>>>>>>>>>>>>>> > >>>> layer.
>>>>>>>>>>>>>>> > >>>> > > > With the developing of storage, many downstream
>>>>>>>>>>>>>>> storage of Flink
>>>>>>>>>>>>>>> > >>>> SQL
>>>>>>>>>>>>>>> > >>>> > has
>>>>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer
>>>>>>>>>>>>>>> will improve
>>>>>>>>>>>>>>> > >>>> performance
>>>>>>>>>>>>>>> > >>>> > from
>>>>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and
>>>>>>>>>>>>>>> computation overhead.
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new
>>>>>>>>>>>>>>> feature.
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > > > --
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese
>>>>>>>>>>>>>>> Academy of Science
>>>>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>> > >>>> > > > E-mail: [hidden email] <
>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > --
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > *With kind regards
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy
>>>>>>>>>>>>>>> of Science
>>>>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>> > >>>> > E-mail: [hidden email] <
>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>> --
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>> *With kind regards
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>>>> Science
>>>>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>>>>> > >>> QQ: 3239559*
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > --
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > *With kind regards
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>>>> Science
>>>>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>>>>> > QQ: 3239559*
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *With kind regards
>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>> Sebastian Liu 刘洋
>>>>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>>>> QQ: 3239559*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best, Jingsong Lee
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> *With kind regards
>>>>>>> ------------------------------------------------------------
>>>>>>> Sebastian Liu 刘洋
>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>>> QQ: 3239559*
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *With kind regards
>>>>> ------------------------------------------------------------
>>>>> Sebastian Liu 刘洋
>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>> Mobile\WeChat: +86—15201613655
>>>>> E-mail: [hidden email] <[hidden email]>
>>>>> QQ: 3239559*
>>>>>
>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>
>> --
>>
>> *With kind regards
>> ------------------------------------------------------------
>> Sebastian Liu 刘洋
>> Institute of Computing Technology, Chinese Academy of Science
>> Mobile\WeChat: +86—15201613655
>> E-mail: [hidden email] <[hidden email]>
>> QQ: 3239559*
>>
>>

--

*With kind regards
------------------------------------------------------------
Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: [hidden email] <[hidden email]>
QQ: 3239559*
Reply | Threaded
Open this post in threaded view
|

Re: Support local aggregate push down for Blink batch planner

Jark Wu-2
Great! Thanks for pushing this work.
Looking forward to the pull requests.

Best,
Jark

On Fri, 8 Jan 2021 at 17:57, Sebastian Liu <[hidden email]> wrote:

> Hi Jark,
>
> Cool, following your suggestions I have created three related subtasks
> under Flink-20791.
> Hope to assign these subtasks to me too, when you have time. And I
> will push forward the relevant implementation.
>
> Jark Wu <[hidden email]> 于2021年1月8日周五 下午12:30写道:
>
>> Hi Sebastian,
>>
>> I assigned the issue to you. But I suggest creating sub-tasks under this
>> issue. Because I think this would be a big contribution.
>> For example, you can split it into:
>> 1. Introduce SupportsAggregatePushDown interface
>> 2. Support SupportsAggregatePushDown in planner
>> 3. Support SupportsAggregatePushDown for JDBC source
>> 4. ...
>>
>> Best,
>> Jark
>>
>> On Thu, 7 Jan 2021 at 23:27, Sebastian Liu <[hidden email]> wrote:
>>
>>> Hi Jark,
>>>
>>> Seems that we have reached the agreement on the proposal. Could you
>>> please help to assign the below jira ticket to me?
>>> https://issues.apache.org/jira/browse/FLINK-20791
>>>
>>> Jark Wu <[hidden email]> 于2021年1月7日周四 上午10:25写道:
>>>
>>>> Thanks for updating the design doc.
>>>> It looks good to me.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Thu, 7 Jan 2021 at 10:16, Jingsong Li <[hidden email]>
>>>> wrote:
>>>>
>>>>> Sounds good to me.
>>>>>
>>>>> We don't have to worry about future changes, because it has covered
>>>>> all the capabilities of calcite aggregation.
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi Jark,
>>>>>>
>>>>>> Sounds good to me. For better scalability in the future, we could add
>>>>>> the AggregateExpression.
>>>>>> ```
>>>>>>
>>>>>> public class AggregateExpression implements ResolvedExpression {
>>>>>>
>>>>>>    private final FunctionDefinition functionDefinition;
>>>>>>
>>>>>>    private final List<FieldReferenceExpression> args;
>>>>>>
>>>>>>    private final @Nullable CallExpression filterExpression;
>>>>>>
>>>>>>    private final DataType resultType;
>>>>>>
>>>>>>    private final boolean distinct;
>>>>>>
>>>>>>    private final boolean approximate;
>>>>>>
>>>>>>
>>>>>>
>>>>>>    private final boolean ignoreNulls;
>>>>>>
>>>>>> }
>>>>>> ```
>>>>>>
>>>>>> And we really only need one GroupingSets parameter for grouping. I
>>>>>> have updated the related interface in the proposal.
>>>>>> Appreciate the continued feedback and help.
>>>>>>
>>>>>> Jark Wu <[hidden email]> 于2021年1月6日周三 下午9:34写道:
>>>>>>
>>>>>>> Hi Liu, Jingsong,
>>>>>>>
>>>>>>> Regarding the agg with filter, I think in theory we can support
>>>>>>> pushing such a pattern into source.
>>>>>>> We don't need to support it in the first version, but in the long
>>>>>>> term, we can support it.
>>>>>>> The designed interface should be future proof.
>>>>>>>
>>>>>>> Considering filter arg and distinct flag should be part of the
>>>>>>> aggregate expression.
>>>>>>> I'm wondering if CallExpression is a good representation for it.
>>>>>>> What do you think about proposing the following
>>>>>>> `AggregateExpression` to replace the `CallExpression`?
>>>>>>>
>>>>>>> class AggregateExpression implements ResolvedExpression {
>>>>>>>     private final FunctionDefinition functionDefinition;
>>>>>>>     private final List<FieldReferenceExpression> args;
>>>>>>>     private final @Nullable CallExpression filterExpr;
>>>>>>>     private final boolean distinct;
>>>>>>> }
>>>>>>>
>>>>>>> Besides, we don't need both groupingFields and groupingSets.
>>>>>>> `groupingSets` should be a superset of groupingFields.
>>>>>>> Then the interface of SupportsAggregatePushDown can be:
>>>>>>>
>>>>>>> interface SupportsAggregatePushDown {
>>>>>>>
>>>>>>>   boolean applyAggregates(
>>>>>>>     List<int[]> groupingSets,
>>>>>>>     List<AggregateExpression> aggregates,
>>>>>>>     DataType producedDataType);
>>>>>>> }
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Jingsong, Jark,
>>>>>>>>
>>>>>>>> Thx so much for our discussion, and the cases mentioned above are
>>>>>>>> really worthy for further discussion.
>>>>>>>>
>>>>>>>> 1. For aggregate with filter expressions: eg: select COUNT(1)
>>>>>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center;
>>>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) ->
>>>>>>>> LocalAgg -> Exchange -> FinalAgg.
>>>>>>>> As there is a Calc above the TableSource, this pattern can't match
>>>>>>>> the LocalAggPushDownRule in the current design.
>>>>>>>>
>>>>>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1)
>>>>>>>> from call_center group by rollup(cc_class, cc_employees);
>>>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg ->
>>>>>>>> Calc.
>>>>>>>> It's also not covered by the current LocalAggPushDownRule design.
>>>>>>>>
>>>>>>>> 3. I want to add a case which we haven't discussed yet.
>>>>>>>> Aggregate with Having clause.
>>>>>>>> eg: select COUNT(1) from call_center group by cc_class having
>>>>>>>> max(cc_tax_percentage) > 0.2;
>>>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg ->
>>>>>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]).
>>>>>>>>
>>>>>>>> The core discussion points are summarized as follows:
>>>>>>>> a) Aggregate is a more complex scenario than predicates or limits,
>>>>>>>> and also depends on the different underlying storages.
>>>>>>>> b) One rule seems can't completely cover all aggregate scenario,
>>>>>>>> but whether SupportSAggregatePushDown interface can be a bit more general?
>>>>>>>> c) Could the CallExpression express the semantics of CalCite
>>>>>>>> AggregateCall?
>>>>>>>>
>>>>>>>> IMO: Completely push down aggregate is generally hard for
>>>>>>>> distributed systems. Usually we need a GROUP BY and exactly
>>>>>>>> matches the partition mode in downstream storage. At the same time,
>>>>>>>> the benefit of remove the final aggregate is actually limited.
>>>>>>>> The LocalAggPushDown generally yields more than 80% of the CPU and
>>>>>>>> IO benefits. But I also agree that
>>>>>>>> the SupportsAggregatePushDown interface should be as generic as
>>>>>>>> possible for future extensions, and meanwhile keep confidence
>>>>>>>> in the interface we design.
>>>>>>>>
>>>>>>>> For core points (a): As the complexity of aggregate, one
>>>>>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / Exchange
>>>>>>>> / FinalXXAgg"
>>>>>>>> in physical phase. Seems that we can't solve all cases with only
>>>>>>>> one rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
>>>>>>>> on the pattern of TableSourceScan + LocalXXAggregate at present.
>>>>>>>>
>>>>>>>> For core points (b & c): I think we can change the interface to be:
>>>>>>>> ```
>>>>>>>>
>>>>>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>>>>> aggregateExpressions, DataType producedDataType, List<int[]>
>>>>>>>> groupingSets);
>>>>>>>>
>>>>>>>> ```
>>>>>>>>
>>>>>>>>
>>>>>>>> Simple Group: groupingSets.size() == 1 &&
>>>>>>>> groupingSets.get(0).equals(groupingFields)
>>>>>>>>
>>>>>>>> Cube Group: groupingSets.size() == IntMath.pow(2,
>>>>>>>> groupingFields.cardinality())
>>>>>>>> Rollup:
>>>>>>>> Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup
>>>>>>>>
>>>>>>>> Then we can handle the complex grouping case. The Connector
>>>>>>>> developer of the downstream storage should determine
>>>>>>>> whether it supports the associated grouping type. For the filter
>>>>>>>> and having clause, they will convert to be related Calc RelNode,
>>>>>>>> and no longer in the LocalAggregate node, the CallExpression may be
>>>>>>>> sufficient to express the semantics of AggregateCall.
>>>>>>>>
>>>>>>>> What do you think? Looking forward to our further discussion.
>>>>>>>>
>>>>>>>>
>>>>>>>> Jingsong Li <[hidden email]> 于2021年1月6日周三 下午2:24写道:
>>>>>>>>
>>>>>>>>> > I think filter expressions and grouping sets are semantic
>>>>>>>>> arguments instead of utilities. If we want to push them into sources, the
>>>>>>>>> connector developers should be aware of them.Wrapping them in a context
>>>>>>>>> implicitly is error-prone that the existing connector will produce wrong
>>>>>>>>> results when upgrading to new Flink versions.
>>>>>>>>>
>>>>>>>>> We can have some mechanism to check the upgrading.
>>>>>>>>>
>>>>>>>>> > I think for these cases, providing a new default method to
>>>>>>>>> override might be a better choice.
>>>>>>>>>
>>>>>>>>> Then we will have three or more methods. For the API level, I
>>>>>>>>> really don't like it...
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jingsong
>>>>>>>>>
>>>>>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>>> I think filter expressions and grouping sets are semantic
>>>>>>>>>> arguments instead of utilities.
>>>>>>>>>> If we want to push them into sources, the connector developers
>>>>>>>>>> should be aware of them.
>>>>>>>>>> Wrapping them in a context implicitly is error-prone that the
>>>>>>>>>> existing connector will produce wrong results
>>>>>>>>>>  when upgrading to new Flink versions (as we are pushing
>>>>>>>>>> grouping_sets/filter_args, but connector ignores it).
>>>>>>>>>> I think for these cases, providing a new default method to
>>>>>>>>>> override might be a better choice.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jark
>>>>>>>>>>
>>>>>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <[hidden email]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I'm also curious about aggregate with filter (COUNT(1)
>>>>>>>>>>> FILTER(WHERE d > 1)). Can we push it down? I'm not sure that a single call
>>>>>>>>>>> expression can express it, and how we should embody it and convey it to
>>>>>>>>>>> users.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jingsong
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <
>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Jark,
>>>>>>>>>>>>
>>>>>>>>>>>> I don't want to limit this interface to LocalAgg Push down.
>>>>>>>>>>>> Actually, sometimes, we can push whole aggregation to source too.
>>>>>>>>>>>>
>>>>>>>>>>>> So, this rule can do something more advanced. For example, we
>>>>>>>>>>>> can push down group sets to source too, for the SQL: "GROUP BY GROUPING
>>>>>>>>>>>> SETS (f1, f2)". Then, we need to add more information to push down.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <[hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I think this may be over designed. We should have confidence
>>>>>>>>>>>>> in the interface we design, the interface should be stable.
>>>>>>>>>>>>> Wrapping things in a big context has a cost of losing user
>>>>>>>>>>>>> convenience.
>>>>>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do
>>>>>>>>>>>>> you know any potential parameters?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <
>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Sebastian,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Well, I mean:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> `boolean applyAggregates(int[] groupingFields,
>>>>>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType producedDataType);`
>>>>>>>>>>>>>> VS
>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> interface Aggregation {
>>>>>>>>>>>>>>   int[] groupingFields();
>>>>>>>>>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>>>>>>>>>   DataType producedDataType();
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the future, so
>>>>>>>>>>>>>> make the parameters interface, which is conducive to the future expansion
>>>>>>>>>>>>>> without destroying the compatibility of user implementation. If it is the
>>>>>>>>>>>>>> way before, users need to modify the code.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <
>>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Jinsong,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thx a lot for your suggestion. These points really need to
>>>>>>>>>>>>>>> be clear in the proposal.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For the semantic problem, I think the main point is the
>>>>>>>>>>>>>>> different returned data types
>>>>>>>>>>>>>>> for the target aggregate function and the row format
>>>>>>>>>>>>>>> returned by the underlying storage.
>>>>>>>>>>>>>>> That's why we provide the producedDataType in the
>>>>>>>>>>>>>>> SupportsAggregatePushDown interface.
>>>>>>>>>>>>>>> Need to let developers know that we need to handle the
>>>>>>>>>>>>>>> semantic differences between
>>>>>>>>>>>>>>> the underlying storage system and Flink in related
>>>>>>>>>>>>>>> connectors. [Supplemented in proposal]
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule
>>>>>>>>>>>>>>> rule, it's also a key point.
>>>>>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE
>>>>>>>>>>>>>>> rule set, and better to put it
>>>>>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For the scalability of the interface, actually I don't
>>>>>>>>>>>>>>> exactly understand your suggestion. Is it to add
>>>>>>>>>>>>>>> an abstract class, to implement the
>>>>>>>>>>>>>>> SupportsAggregatePushDown interface, and holds the
>>>>>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[]
>>>>>>>>>>>>>>> GroupingFields, DataType producedDataType`
>>>>>>>>>>>>>>> fields?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jingsong Li <[hidden email]> 于2021年1月5日周二 下午2:44写道:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful
>>>>>>>>>>>>>>>> discussion has solved
>>>>>>>>>>>>>>>> many of my concerns.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ## Semantic problems
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We may need to add some mechanisms or comments, because as
>>>>>>>>>>>>>>>> far as I know,
>>>>>>>>>>>>>>>> the semantics of each database is actually different, which
>>>>>>>>>>>>>>>> may need to be
>>>>>>>>>>>>>>>> reflected in your specific implementation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For example, the AVG output types of various databases may
>>>>>>>>>>>>>>>> be different.
>>>>>>>>>>>>>>>> For example, MySQL outputs double, this is different from
>>>>>>>>>>>>>>>> Flink. What
>>>>>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and
>>>>>>>>>>>>>>>> count, But we also
>>>>>>>>>>>>>>>> need care about decimal and others)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ## The phase of push-down rule
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano
>>>>>>>>>>>>>>>> phase, which may
>>>>>>>>>>>>>>>> make the cost calculation very troublesome.
>>>>>>>>>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ## About interface
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>>>>>>>>>> `Aggregate`
>>>>>>>>>>>>>>>> interface, it contains `List<CallExpression>
>>>>>>>>>>>>>>>> aggregateExpressions, int[]
>>>>>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this
>>>>>>>>>>>>>>>> way, we can add
>>>>>>>>>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think the current design is very good, just put forward
>>>>>>>>>>>>>>>> some ideas.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <
>>>>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > Hi Jark,
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some
>>>>>>>>>>>>>>>> adjustments.
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if
>>>>>>>>>>>>>>>> the TableSource can
>>>>>>>>>>>>>>>> > handle all of the aggregates.
>>>>>>>>>>>>>>>> > It's better to just return a boolean type to indicate
>>>>>>>>>>>>>>>> whether all of
>>>>>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in
>>>>>>>>>>>>>>>> proposal]
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the
>>>>>>>>>>>>>>>> produced data type of
>>>>>>>>>>>>>>>> > the new table source to make sure that the new table
>>>>>>>>>>>>>>>> source can
>>>>>>>>>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg
>>>>>>>>>>>>>>>> function's return type.
>>>>>>>>>>>>>>>> > The reason for adding this parameter in this function is
>>>>>>>>>>>>>>>> also to facilitate
>>>>>>>>>>>>>>>> > the user to build the final output type. I have changed
>>>>>>>>>>>>>>>> this parameter
>>>>>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have
>>>>>>>>>>>>>>>> changed to use
>>>>>>>>>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > Thx again for the suggestion, looking for the further
>>>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > Jark Wu <[hidden email]> 于2021年1月5日周二 下午12:05写道:
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > Regarding to the updated interface,
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>>>>>>>>>> aggregateExpressions,
>>>>>>>>>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > final class Result {
>>>>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>>>>> acceptedAggregates;
>>>>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>>>>> remainingAggregates;
>>>>>>>>>>>>>>>> > > }
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > I have following comments:
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a
>>>>>>>>>>>>>>>> boolean return type
>>>>>>>>>>>>>>>> > > enough?
>>>>>>>>>>>>>>>> > >     From my understanding, all of the aggregates should
>>>>>>>>>>>>>>>> be accepted,
>>>>>>>>>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>>>>>>>>>> > >     Therefore, users don't need to distinguish which
>>>>>>>>>>>>>>>> aggregates are
>>>>>>>>>>>>>>>> > > "accepted".
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced
>>>>>>>>>>>>>>>> data type of the
>>>>>>>>>>>>>>>> > > new source, or just the return type of all the agg
>>>>>>>>>>>>>>>> functions?
>>>>>>>>>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for
>>>>>>>>>>>>>>>> users to concat a
>>>>>>>>>>>>>>>> > final
>>>>>>>>>>>>>>>> > > output type.
>>>>>>>>>>>>>>>> > >     The return type of each agg function can be
>>>>>>>>>>>>>>>> obtained from the
>>>>>>>>>>>>>>>> > > `CallExpression`.
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to
>>>>>>>>>>>>>>>> `grouping` or
>>>>>>>>>>>>>>>> > > `groupedFields` ?
>>>>>>>>>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>>>>>>>>>> "grouping sets".
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > What do you think?
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > Best,
>>>>>>>>>>>>>>>> > > Jark
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <
>>>>>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>>>>>>>>>> > >> I meant idea #2.
>>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>>> > >> Best,
>>>>>>>>>>>>>>>> > >> Kurt
>>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>> > >> wrote:
>>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>>> > >>> Hi Kurt,
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is
>>>>>>>>>>>>>>>> more like a
>>>>>>>>>>>>>>>> > >>> physical operator rather than logical
>>>>>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2
>>>>>>>>>>>>>>>> which handle all in
>>>>>>>>>>>>>>>> > >>> the physical optimization phase?
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>> Kurt Young <[hidden email]> 于2021年1月5日周二 上午9:52写道:
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator
>>>>>>>>>>>>>>>> rather than logical
>>>>>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>>> > >>>> Best,
>>>>>>>>>>>>>>>> > >>>> Kurt
>>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>> > >>>> wrote:
>>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and
>>>>>>>>>>>>>>>> valuable suggestions.
>>>>>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of
>>>>>>>>>>>>>>>> upgrading the new
>>>>>>>>>>>>>>>> > table
>>>>>>>>>>>>>>>> > >>>> > source api,
>>>>>>>>>>>>>>>> > >>>> > we really should consider the new interface for
>>>>>>>>>>>>>>>> the new optimize
>>>>>>>>>>>>>>>> > >>>> rule. If
>>>>>>>>>>>>>>>> > >>>> > the new rule
>>>>>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it
>>>>>>>>>>>>>>>> sooner or later. I
>>>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>>>> > >>>> > change to use
>>>>>>>>>>>>>>>> > >>>> > the ability interface for the
>>>>>>>>>>>>>>>> SupportsAggregatePushDown definition
>>>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>>>> > >>>> above
>>>>>>>>>>>>>>>> > >>>> > proposal.
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a
>>>>>>>>>>>>>>>> better choice, and
>>>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>>>> > >>>> > resolved this
>>>>>>>>>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC
>>>>>>>>>>>>>>>> connector, as we don't
>>>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>>>> > >>>> > Druid connector
>>>>>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we
>>>>>>>>>>>>>>>> should use idea 1
>>>>>>>>>>>>>>>> > >>>> or
>>>>>>>>>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > What do you think?  After we reach the agreement
>>>>>>>>>>>>>>>> on the proposal,
>>>>>>>>>>>>>>>> > our
>>>>>>>>>>>>>>>> > >>>> team
>>>>>>>>>>>>>>>> > >>>> > can drive to
>>>>>>>>>>>>>>>> > >>>> > complete this feature.
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > Jark Wu <[hidden email]> 于2020年12月29日周二
>>>>>>>>>>>>>>>> 下午2:58写道:
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>>>>>>>>>> improvement for
>>>>>>>>>>>>>>>> > >>>> Flink
>>>>>>>>>>>>>>>> > >>>> > SQL.
>>>>>>>>>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>>>>>>>>>> thoughts:
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource
>>>>>>>>>>>>>>>> in 1.11 and
>>>>>>>>>>>>>>>> > proposed
>>>>>>>>>>>>>>>> > >>>> a new
>>>>>>>>>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you
>>>>>>>>>>>>>>>> update your
>>>>>>>>>>>>>>>> > >>>> proposal to
>>>>>>>>>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>>>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown,
>>>>>>>>>>>>>>>> SupportsProjectionPushDown.
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a
>>>>>>>>>>>>>>>> better
>>>>>>>>>>>>>>>> > >>>> representation
>>>>>>>>>>>>>>>> > >>>> > than
>>>>>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because,
>>>>>>>>>>>>>>>> it would be
>>>>>>>>>>>>>>>> > easier
>>>>>>>>>>>>>>>> > >>>> to
>>>>>>>>>>>>>>>> > >>>> > know
>>>>>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors
>>>>>>>>>>>>>>>> will be supported
>>>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>>>> > >>>> the
>>>>>>>>>>>>>>>> > >>>> > > plan?
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > Best,
>>>>>>>>>>>>>>>> > >>>> > > Jark
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>>>>>>>>>> > [hidden email]>
>>>>>>>>>>>>>>>> > >>>> > wrote:
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > > Hi all,
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the
>>>>>>>>>>>>>>>> Blink Planner.
>>>>>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently
>>>>>>>>>>>>>>>> fully done at Flink
>>>>>>>>>>>>>>>> > >>>> layer.
>>>>>>>>>>>>>>>> > >>>> > > > With the developing of storage, many
>>>>>>>>>>>>>>>> downstream storage of Flink
>>>>>>>>>>>>>>>> > >>>> SQL
>>>>>>>>>>>>>>>> > >>>> > has
>>>>>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer
>>>>>>>>>>>>>>>> will improve
>>>>>>>>>>>>>>>> > >>>> performance
>>>>>>>>>>>>>>>> > >>>> > from
>>>>>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and
>>>>>>>>>>>>>>>> computation overhead.
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new
>>>>>>>>>>>>>>>> feature.
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > > > --
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese
>>>>>>>>>>>>>>>> Academy of Science
>>>>>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>>> > >>>> > > > E-mail: [hidden email] <
>>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > --
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > *With kind regards
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy
>>>>>>>>>>>>>>>> of Science
>>>>>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>>> > >>>> > E-mail: [hidden email] <
>>>>>>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>> --
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>> *With kind regards
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>>>>> Science
>>>>>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>>> > >>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>>>>>> > >>> QQ: 3239559*
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > --
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > *With kind regards
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>>>>> Science
>>>>>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>>> > E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>>>>>> > QQ: 3239559*
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *With kind regards
>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>> Sebastian Liu 刘洋
>>>>>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>>>>>>>>>>> QQ: 3239559*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> *With kind regards
>>>>>>>> ------------------------------------------------------------
>>>>>>>> Sebastian Liu 刘洋
>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>>>> QQ: 3239559*
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *With kind regards
>>>>>> ------------------------------------------------------------
>>>>>> Sebastian Liu 刘洋
>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>> Mobile\WeChat: +86—15201613655
>>>>>> E-mail: [hidden email] <[hidden email]>
>>>>>> QQ: 3239559*
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>
>>> --
>>>
>>> *With kind regards
>>> ------------------------------------------------------------
>>> Sebastian Liu 刘洋
>>> Institute of Computing Technology, Chinese Academy of Science
>>> Mobile\WeChat: +86—15201613655
>>> E-mail: [hidden email] <[hidden email]>
>>> QQ: 3239559*
>>>
>>>
>
> --
>
> *With kind regards
> ------------------------------------------------------------
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: [hidden email] <[hidden email]>
> QQ: 3239559*
>
>
12