[jira] [Created] (FLINK-5433) initiate function of Aggregate does not take effect for DataStream aggregation

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-5433) initiate function of Aggregate does not take effect for DataStream aggregation

Shang Yuanchun (Jira)
Shaoxuan Wang created FLINK-5433:
------------------------------------

             Summary: initiate function of Aggregate does not take effect for DataStream aggregation
                 Key: FLINK-5433
                 URL: https://issues.apache.org/jira/browse/FLINK-5433
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
            Reporter: Shaoxuan Wang


The initiate function of Aggregate works for dataset aggregation, but does not work for DataStream aggregation.

For instance, when giving an initial value, say 2, for CountAggregate. The result of dataset aggregate will take this change into account, but dataStream aggregate will not.
{code}
class CountAggregate extends Aggregate[Long] {
  override def initiate(intermediate: Row): Unit = {
    intermediate.setField(countIndex, 2L)
  }
}
{code}

The output for dataset test(testWorkingAggregationDataTypes) will result in
.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
 expected: [1,1,1,1,1.5,1.5,2]
 received: [1,1,1,1,1.5,1.5,4] (the result of last count aggregate is bigger than expect value by 2, as expected)

But the output for datastream test(testProcessingTimeSlidingGroupWindowOverCount) will remain the same:
.select('string, 'int.count, 'int.avg)
Expected :List(Hello world,1,3, Hello world,2,3, Hello,1,2, Hello,2,2, Hi,1,1)
Actual   :MutableList(Hello world,1,3, Hello world,2,3, Hello,1,2, Hello,2,2, Hi,1,1)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)