[jira] [Created] (FLINK-21923) SplitAggregateRule will be abnormal, when the sum/count and avg in SQL at the same time

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-21923) SplitAggregateRule will be abnormal, when the sum/count and avg in SQL at the same time

Shang Yuanchun (Jira)
tartarus created FLINK-21923:
--------------------------------

             Summary: SplitAggregateRule will be abnormal, when the sum/count and avg in SQL at the same time
                 Key: FLINK-21923
                 URL: https://issues.apache.org/jira/browse/FLINK-21923
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.10.0
            Reporter: tartarus
             Fix For: 1.13.0


SplitAggregateRule optimizes one-layer aggregation to two-layer aggregation to improve computing performance under data skew.
In the partial phase, avg will be translated into count and sum. If count already exists in the original SQL at this time, the engine will remove the duplicate count, and then add Project to calculate and restore the optimized count result value.
{code:java}
    relBuilder.aggregate(
      relBuilder.groupKey(fullGroupSet, ImmutableList.of[ImmutableBitSet](fullGroupSet)),
      newPartialAggCalls)
    relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
      .setPartialFinalType(PartialFinalType.PARTIAL)
{code}
so `relBuilder.peek()` will return `FlinkLogicalCalc` not `FlinkLogicalAggregate`,
then will throw exception like
{code:java}
java.lang.ClassCastException: org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be cast to org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate

        at org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRule.onMatch(SplitAggregateRule.scala:286)
        at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
        at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
        at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
        at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
        at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
        at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
        at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
        at org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
        at org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
        at org.apache.flink.table.planner.utils.TableTestUtilBase.verifyPlan(TableTestBase.scala:283)
        at org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRuleTest.testAggWithFilterClause2(SplitAggregateRuleTest.scala:205)
{code}
We can reproduce stably and pass the test cases in `SplitAggregateRuleTest`

{code:java}
  @Test
  def testAggBothWithAvgAndCount(): Unit = {
    util.tableEnv.getConfig.getConfiguration.setBoolean(
      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
    val sqlQuery =
      s"""
         |SELECT
         |  COUNT(DISTINCT b) FILTER (WHERE NOT b = 2),
         |  SUM(b) FILTER (WHERE NOT b = 5),
         |  count(b),
         |  AVG(b),
         |  sum(b)
         |FROM MyTable
         |GROUP BY a
       """.stripMargin
    util.verifyPlan(sqlQuery)
  }
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)