Benchao Li created FLINK-15421:
---------------------------------- Summary: GroupAggsHandler throws java.time.LocalDateTime cannot be cast to java.sql.Timestamp Key: FLINK-15421 URL: https://issues.apache.org/jira/browse/FLINK-15421 Project: Flink Issue Type: Bug Affects Versions: 1.9.1, 1.10.0 Reporter: Benchao Li `TimestmapType` has two types of physical representation: `Timestamp` and `LocalDateTime`. When we use following SQL, it will conflict each other: {quote}SELECT SUM(cnt) as s, MAX(ts) FROM SELECT `string`, `int`, COUNT(*) AS cnt, MAX(rowtime) as ts FROM T1 GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND) GROUP BY `string` {quote} with 'table.exec.emit.early-fire.enabled' = true. The exceptions is below: {quote}Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527) at java.lang.Thread.run(Thread.java:748) {quote} I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`: {quote} @Test def testEarlyFireWithTumblingWindow(): Unit = { val stream = failingDataSource(data) .assignTimestampsAndWatermarks( new TimestampAndWatermarkWithOffset [(Long, Int, Double, Float, BigDecimal, String, String)](10L)) val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name) tEnv.registerTable("T1", table) tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled", true) tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", "1000 ms") val sql = """ |SELECT | SUM(cnt) as s, | MAX(ts) |FROM | (SELECT | `string`, | `int`, | COUNT(*) AS cnt, | MAX(rowtime) as ts | FROM T1 | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)) |GROUP BY `string` |""".stripMargin tEnv.sqlQuery(sql).toRetractStream[Row].print() env.execute() }{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |