[jira] [Created] (FLINK-15943) Rowtime field name cannot be the same as the json field

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-15943) Rowtime field name cannot be the same as the json field

Shang Yuanchun (Jira)
gkgkgk created FLINK-15943:
------------------------------

             Summary: Rowtime  field name cannot be the same as the json field
                 Key: FLINK-15943
                 URL: https://issues.apache.org/jira/browse/FLINK-15943
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API, Table SQL / Planner
    Affects Versions: 1.9.0
            Reporter: gkgkgk


Run the following sql:

-- sql start 
--source
CREATE TABLE dwd_user_log (
  id VARCHAR,
  ctime TIMESTAMP,
  sessionId VARCHAR,
  pageId VARCHAR,
  eventId VARCHAR,
  deviceId Decimal
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'dev_dwd_user_log_02',
  'connector.startup-mode' = 'latest-offset',
  'connector.properties.0.key' = 'zookeeper.connect',
  'connector.properties.0.value' = 'node01:2181',
  'connector.properties.1.key' = 'bootstrap.servers',
  'connector.properties.1.value' = 'node01:9092',
  'connector.properties.2.key' = 'group.id',
  'connector.properties.2.value' = 'dev-group',
  'update-mode' = 'append',
  'format.type' = 'json',
  -- 'format.derive-schema' = 'true',
  'format.json-schema' = '{
    "type": "object",
    "properties": {
      "id": {
      "type": "string"
      },
      "ctime": {
      "type": "string",
      "format": "date-time"
      },
      "pageId": {
      "type": "string"
      },
      "eventId": {
      "type": "string"
      },
      "sessionId": {
      "type": "string"
      },
      "deviceId": {
      "type": "number"
      }
    }
  }',
  'schema.1.rowtime.timestamps.type' = 'from-field',
  'schema.1.rowtime.timestamps.from' = 'ctime',
  'schema.1.rowtime.watermarks.type' = 'periodic-bounded',
  'schema.1.rowtime.watermarks.delay' = '10000'
);  


-- sink
-- sink for pv
CREATE TABLE dws_pv (
    windowStart TIMESTAMP,
  windowEnd TIMESTAMP,
  pageId VARCHAR,
  viewCount BIGINT
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'dev_dws_pvuv_02',
  'connector.startup-mode' = 'latest-offset',
  'connector.properties.0.key' = 'zookeeper.connect',
  'connector.properties.0.value' = 'node01:2181',
  'connector.properties.1.key' = 'bootstrap.servers',
  'connector.properties.1.value' = 'node01:9092',
  'connector.properties.2.key' = 'group.id',
  'connector.properties.2.value' = 'dev-group',
  'update-mode' = 'append',
  'format.type' = 'json',
  'format.derive-schema' = 'true'
);

-- pv
INSERT INTO dws_pv
SELECT
  TUMBLE_START(ctime, INTERVAL '20' SECOND)  AS windowStart,
  TUMBLE_END(ctime, INTERVAL '20' SECOND)  AS windowEnd,
  pageId,
  COUNT(deviceId) AS viewCount
FROM dwd_user_log
GROUP BY TUMBLE(ctime, INTERVAL '20' SECOND),pageId;
-- sql end
And hit the following error:
{code:java}
//Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'ctime' could not be resolved by the field mapping.Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'ctime' could not be resolved by the field mapping. at org.apache.flink.table.planner.sources.TableSourceUtil$.org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:357) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:388) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:388) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.sources.TableSourceUtil$.org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:388) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:275) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:270) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:270) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:117) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)