gkgkgk created FLINK-15943:
------------------------------ Summary: Rowtime field name cannot be the same as the json field Key: FLINK-15943 URL: https://issues.apache.org/jira/browse/FLINK-15943 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Planner Affects Versions: 1.9.0 Reporter: gkgkgk Run the following sql: -- sql start --source CREATE TABLE dwd_user_log ( id VARCHAR, ctime TIMESTAMP, sessionId VARCHAR, pageId VARCHAR, eventId VARCHAR, deviceId Decimal ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'dev_dwd_user_log_02', 'connector.startup-mode' = 'latest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'node01:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'node01:9092', 'connector.properties.2.key' = 'group.id', 'connector.properties.2.value' = 'dev-group', 'update-mode' = 'append', 'format.type' = 'json', -- 'format.derive-schema' = 'true', 'format.json-schema' = '{ "type": "object", "properties": { "id": { "type": "string" }, "ctime": { "type": "string", "format": "date-time" }, "pageId": { "type": "string" }, "eventId": { "type": "string" }, "sessionId": { "type": "string" }, "deviceId": { "type": "number" } } }', 'schema.1.rowtime.timestamps.type' = 'from-field', 'schema.1.rowtime.timestamps.from' = 'ctime', 'schema.1.rowtime.watermarks.type' = 'periodic-bounded', 'schema.1.rowtime.watermarks.delay' = '10000' ); -- sink -- sink for pv CREATE TABLE dws_pv ( windowStart TIMESTAMP, windowEnd TIMESTAMP, pageId VARCHAR, viewCount BIGINT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'dev_dws_pvuv_02', 'connector.startup-mode' = 'latest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'node01:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'node01:9092', 'connector.properties.2.key' = 'group.id', 'connector.properties.2.value' = 'dev-group', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); -- pv INSERT INTO dws_pv SELECT TUMBLE_START(ctime, INTERVAL '20' SECOND) AS windowStart, TUMBLE_END(ctime, INTERVAL '20' SECOND) AS windowEnd, pageId, COUNT(deviceId) AS viewCount FROM dwd_user_log GROUP BY TUMBLE(ctime, INTERVAL '20' SECOND),pageId; -- sql end And hit the following error: {code:java} //Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'ctime' could not be resolved by the field mapping.Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'ctime' could not be resolved by the field mapping. at org.apache.flink.table.planner.sources.TableSourceUtil$.org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:357) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:388) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:388) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.sources.TableSourceUtil$.org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:388) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:275) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:270) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:270) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:117) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |