gkgkgk created FLINK-16622:
------------------------------ Summary: Type ARRAY<STRING> of table field 'id' does not match with the physical type LEGACY('ARRAY', 'ANY<[Ljava.lang.String;, rO0ABXNyAD......>') of the 'id' field of the TableSource return type. Key: FLINK-16622 URL: https://issues.apache.org/jira/browse/FLINK-16622 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.10.0 Reporter: gkgkgk when i define a ARRAY<VARCHAR> field in table schema and define field in 'format.json-schema' like this 'format.json-schema' = '{ "type": "object", "properties": { "id": { "type": "array", "items":{"type":"string"} } }' there is a error flow this: Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ARRAY<STRING> of table field 'id' does not match with the physical type LEGACY('ARRAY', 'ANY<[Ljava.lang.String;, rO0ABXNyADdvc......>') of the 'id' field of the TableSource return type. the flow is my ddl: --source CREATE TABLE dwd_user_log ( id ARRAY<VARCHAR>, ctime TIMESTAMP(3), pageId VARCHAR, deviceId VARCHAR, WATERMARK FOR ctime AS ctime - INTERVAL '10' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'dev_dwd_user_log_02', 'connector.startup-mode' = 'latest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'node14.example.com:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'node14.example.com:9092', 'connector.properties.2.key' = 'group.id', 'connector.properties.2.value' = 'dev-group', 'update-mode' = 'append', 'format.type' = 'json', -- 'format.derive-schema' = 'true' 'format.json-schema' = '{ "type": "object", "properties": { "id": { "type": "array", "items":{"type":"string"} }, "ctime": { "type": "string", "format": "date-time" }, "pageId": { "type": "string" }, "deviceId": { "type": "string" } } }' -- 'schema.1.rowtime.timestamps.type' = 'from-field', -- 'schema.1.rowtime.timestamps.from' = 'ctime', -- 'schema.1.rowtime.watermarks.type' = 'periodic-bounded', -- 'schema.1.rowtime.watermarks.delay' = '10000' -- 'schema.1.from' = 'ctime' ); -- sink -- sink for pv CREATE TABLE dws_pv ( windowStart TIMESTAMP(3), windowEnd TIMESTAMP(3), pageId VARCHAR, id ARRAY<VARCHAR>, viewCount BIGINT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'dev_dws_pvuv_02', 'connector.startup-mode' = 'latest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'node14.example.com:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'node14.example.com:9092', 'connector.properties.2.key' = 'group.id', 'connector.properties.2.value' = 'dev-group', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); -- pv INSERT INTO dws_pv SELECT TUMBLE_START(ctime, INTERVAL '20' SECOND) AS windowStart, TUMBLE_END(ctime, INTERVAL '20' SECOND) AS windowEnd, pageId, id, COUNT(deviceId) AS viewCount FROM dwd_user_log GROUP BY TUMBLE(ctime, INTERVAL '20' SECOND),pageId,id; -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |