bianhaihua created FLINK-17317:
---------------------------------- Summary: Schema.rowtime() method not working correctly, throws 'Window aggregate can only be defined over a time attribute column' exception Key: FLINK-17317 URL: https://issues.apache.org/jira/browse/FLINK-17317 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.10.0 Reporter: bianhaihua part of pom: {quote} h4. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>1.10.0</version> </dependency> {quote} part of code: {quote}{{StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();}} {{ bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);}} {{EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();}} {{StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);}} {{ConnectorDescriptor kafkaConn = new Kafka().version("universal")}} {{ .topic(this.topic)}} {{ .startFromEarliest()}} {{ .properties(kafkaProps);}} {{kafkaConn.toProperties().forEach((k, v) -> logger.info("'{}'='{}'", k, v));}} {{String jsonSchema = }}{{"{\"type\":\"object\","}} {{ + "\"properties\":"}} {{ + "\{\"PONRXPower\":{\"type\":\"integer\"},"}} {{ + "\"actualTime\":\{\"type\": \"string\",\"format\": \"date-time\"},"}} {{ + "\"deviceId\":\{\"type\":\"string\"}}}";}} {{ FormatDescriptor jsonFormat = new Json().failOnMissingField(false)}} {{ .jsonSchema(jsonSchema);}} {{ Schema tableSchema = new Schema()}} {{ .field("actualTime", DataTypes.TIMESTAMP(3))}} {{ .rowtime(new Rowtime().timestampsFromField("actualTime").watermarksPeriodicAscending())}} {{ .field("deviceId", DataTypes.STRING())}} {{ .field("PONRXPower", DataTypes.BIGINT());}} {{bsTableEnv.connect(kafkaConn)}} {{ .withFormat(jsonFormat)}} {{ .withSchema(tableSchema)}} {{ .inAppendMode()}} {{ .createTemporaryTable("rxpower_detail");}} {{Table table2 = bsTableEnv.sqlQuery("select TUMBLE_ROWTIME(actualTime, INTERVAL '5' second) as win_end,"}} {{ + " deviceId, count(deviceId) as lc from rxpower_detail "}} {{ + " where PONRXPower< " + LOW_OPTICAL_POWER}} {{ + " GROUP BY TUMBLE(actualTime, INTERVAL '5' second), deviceId ");}} {{ DataStream<Tuple3<java.sql.Timestamp, String, Long>> resultStream = bsTableEnv.toAppendStream(table2,}} {{ TypeInformation.of(new TypeHint<Tuple3<java.sql.Timestamp, String, Long>>() {}} {{ }));}} {{resultStream.print();}} {{try {}} {{ bsTableEnv.execute("table api test");}} {{ } catch (Exception e) {}} {{ logger.error(e.getMessage(), e);}} {{ }}} {quote} excetpions: {quote}{{^Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.Exception in thread "main" org.apache.flink.table.api.TableException: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. at org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:51) at org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch(LogicalWindowAggregateRuleBase.scala:79) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:94) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)^}} {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |