Dawid Wysakowicz created FLINK-15631:
---------------------------------------- Summary: Cannot use generic types as the result of an AggregateFunction in Blink planner Key: FLINK-15631 URL: https://issues.apache.org/jira/browse/FLINK-15631 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.9.0, 1.10.0 Reporter: Dawid Wysakowicz It is not possible to use a GenericTypeInfo for a result type of an {{AggregateFunction}} in a retract mode with state cleaning disabled. {code} @Test def testGenericTypes(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tEnv = StreamTableEnvironment.create(env, setting) val t = env.fromElements(1, 2, 3).toTable(tEnv, 'a) val results = t .select(new GenericAggregateFunction()('a)) .toRetractStream[Row] val sink = new TestingRetractSink results.addSink(sink).setParallelism(1) env.execute() } class RandomClass(var i: Int) class GenericAggregateFunction extends AggregateFunction[java.lang.Integer, RandomClass] { override def getValue(accumulator: RandomClass): java.lang.Integer = accumulator.i override def createAccumulator(): RandomClass = new RandomClass(0) override def getResultType: TypeInformation[java.lang.Integer] = new GenericTypeInfo[Integer](classOf[Integer]) override def getAccumulatorType: TypeInformation[RandomClass] = new GenericTypeInfo[RandomClass]( classOf[RandomClass]) def accumulate(acc: RandomClass, value: Int): Unit = { acc.i = value } def retract(acc: RandomClass, value: Int): Unit = { acc.i = value } def resetAccumulator(acc: RandomClass): Unit = { acc.i = 0 } } {code} The code above fails with: {code} Caused by: java.lang.UnsupportedOperationException: BinaryGeneric cannot be compared at org.apache.flink.table.dataformat.BinaryGeneric.equals(BinaryGeneric.java:77) at GroupAggValueEqualiser$17.equalsWithoutHeader(Unknown Source) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:177) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:170) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527) at java.lang.Thread.run(Thread.java:748) {code} This is related to FLINK-13702 -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |