[jira] [Created] (FLINK-15631) Cannot use generic types as the result of an AggregateFunction in Blink planner

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-15631) Cannot use generic types as the result of an AggregateFunction in Blink planner

Shang Yuanchun (Jira)
Dawid Wysakowicz created FLINK-15631:
----------------------------------------

             Summary: Cannot use generic types as the result of an AggregateFunction in Blink planner
                 Key: FLINK-15631
                 URL: https://issues.apache.org/jira/browse/FLINK-15631
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.9.0, 1.10.0
            Reporter: Dawid Wysakowicz


It is not possible to use a GenericTypeInfo for a result type of an {{AggregateFunction}} in a retract mode with state cleaning disabled.

{code}

  @Test
  def testGenericTypes(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tEnv = StreamTableEnvironment.create(env, setting)
    val t = env.fromElements(1, 2, 3).toTable(tEnv, 'a)

    val results = t
      .select(new GenericAggregateFunction()('a))
      .toRetractStream[Row]

    val sink = new TestingRetractSink
    results.addSink(sink).setParallelism(1)
    env.execute()
  }

class RandomClass(var i: Int)

class GenericAggregateFunction extends AggregateFunction[java.lang.Integer, RandomClass] {
  override def getValue(accumulator: RandomClass): java.lang.Integer = accumulator.i

  override def createAccumulator(): RandomClass = new RandomClass(0)

  override def getResultType: TypeInformation[java.lang.Integer] = new GenericTypeInfo[Integer](classOf[Integer])

  override def getAccumulatorType: TypeInformation[RandomClass] = new GenericTypeInfo[RandomClass](
    classOf[RandomClass])

  def accumulate(acc: RandomClass, value: Int): Unit = {
    acc.i = value
  }

  def retract(acc: RandomClass, value: Int): Unit = {
    acc.i = value
  }

  def resetAccumulator(acc: RandomClass): Unit = {
    acc.i = 0
  }
}
{code}

The code above fails with:

{code}
Caused by: java.lang.UnsupportedOperationException: BinaryGeneric cannot be compared
        at org.apache.flink.table.dataformat.BinaryGeneric.equals(BinaryGeneric.java:77)
        at GroupAggValueEqualiser$17.equalsWithoutHeader(Unknown Source)
        at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:177)
        at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
        at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:170)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
        at java.lang.Thread.run(Thread.java:748)
{code}

This is related to FLINK-13702



--
This message was sent by Atlassian Jira
(v8.3.4#803005)