Leonard Xu created FLINK-16345:
----------------------------------
Summary: Computed column can not refer time attribute column
Key: FLINK-16345
URL:
https://issues.apache.org/jira/browse/FLINK-16345 Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Leonard Xu
If a computed column refer a time attribute column, computed column will lose time attribute and cause validation fail.
{code:java}
CREATE TABLE orders (
order_id STRING,
order_time TIMESTAMP(3),
amount DOUBLE,
amount_kg as amount * 1000,
// can not select computed column standard_ts which from column order_time that used as WATERMARK
standard_ts as order_time + INTERVAL '8' HOUR,
WATERMARK FOR order_time AS order_time
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.10',
'connector.topic' = 'flink_orders',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'connector.startup-mode' = 'earliest-offset',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
{code}
The query `select amount_kg from orders` runs normally,
the` he query `select standard_ts from orders` throws a validation exception message as following:
{noformat}
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIMESTAMP(3) ts) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIME ATTRIBUTE(ROWTIME) ts) NOT NULL
rel:
LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[$3], ts=[$4])
LogicalWatermarkAssigner(rowtime=[order_time], watermark=[$1])
LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[*($2, 1000)], ts=[+($1, 28800000:INTERVAL HOUR)])
LogicalTableScan(table=[[default_catalog, default_database, orders, source: [Kafka010TableSource(order_id, order_time, amount)]]])
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)