[jira] [Created] (FLINK-16345) Computed column can not refer time attribute column

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-16345) Computed column can not refer time attribute column

Shang Yuanchun (Jira)
Leonard Xu created FLINK-16345:
----------------------------------

             Summary: Computed column can not refer time attribute column
                 Key: FLINK-16345
                 URL: https://issues.apache.org/jira/browse/FLINK-16345
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.10.0
            Reporter: Leonard Xu


If a computed column refer a time attribute column, computed column will lose  time attribute and cause validation fail.
{code:java}
CREATE TABLE orders (
  order_id STRING,
  order_time TIMESTAMP(3),
  amount DOUBLE,
  amount_kg as amount * 1000,
  // can not select computed column standard_ts which from column order_time that used as WATERMARK
  standard_ts as order_time + INTERVAL '8' HOUR,
  WATERMARK FOR order_time AS order_time
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.10',
  'connector.topic' = 'flink_orders',
  'connector.properties.zookeeper.connect' = 'localhost:2181',
  'connector.properties.bootstrap.servers' = 'localhost:9092',
  'connector.properties.group.id' = 'testGroup',
  'connector.startup-mode' = 'earliest-offset',
  'format.type' = 'json',
  'format.derive-schema' = 'true'
);

{code}
The query `select amount_kg from orders` runs normally, 

the` he query `select standard_ts from orders` throws a validation exception message as following:
{noformat}
[ERROR] Could not execute SQL statement. Reason:
 java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
 validated type:
 RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIMESTAMP(3) ts) NOT NULL
 converted type:
 RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIME ATTRIBUTE(ROWTIME) ts) NOT NULL
 rel:
 LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[$3], ts=[$4])
 LogicalWatermarkAssigner(rowtime=[order_time], watermark=[$1])
 LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[*($2, 1000)], ts=[+($1, 28800000:INTERVAL HOUR)])
 LogicalTableScan(table=[[default_catalog, default_database, orders, source: [Kafka010TableSource(order_id, order_time, amount)]]])
 {noformat}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)