[jira] [Created] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-11136) Fix the logical of merge for DISTINCT aggregates

Shang Yuanchun (Jira)
Dian Fu created FLINK-11136:
-------------------------------

             Summary: Fix the logical of merge for DISTINCT aggregates
                 Key: FLINK-11136
                 URL: https://issues.apache.org/jira/browse/FLINK-11136
             Project: Flink
          Issue Type: Test
          Components: Table API & SQL
            Reporter: Dian Fu
            Assignee: Dian Fu


The logic of merge for DISTINCT aggregates has bug. For the following query:
{code:java}
SELECT
  c,
  COUNT(DISTINCT b),
  SUM(DISTINCT b),
  SESSION_END(rowtime, INTERVAL '0.005' SECOND)
FROM MyTable
GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c{code}
the following exception will be thrown:
{code:java}
Caused by: java.lang.ClassCastException: org.apache.flink.types.Row cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:58)
at org.apache.flink.table.functions.aggfunctions.SumAggFunction.accumulate(SumAggFunction.scala:50)
at GroupingWindowAggregateHelper$18.mergeAccumulatorsPair(Unknown Source)
at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:66)
at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.merge(AggregateAggFunction.scala:33)
at org.apache.flink.runtime.state.heap.HeapAggregatingState.mergeState(HeapAggregatingState.java:117)
at org.apache.flink.runtime.state.heap.AbstractHeapMergingState$MergeTransformation.apply(AbstractHeapMergingState.java:102)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:463)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
at org.apache.flink.runtime.state.heap.AbstractHeapMergingState.mergeNamespaces(AbstractHeapMergingState.java:91)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:341)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)