Jark Wu created FLINK-20289:
-------------------------------
Summary: Computed columns can be calculated after ChangelogNormalize to reduce shuffle
Key: FLINK-20289
URL:
https://issues.apache.org/jira/browse/FLINK-20289 Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Jark Wu
In FLINK-19878, we improved that the ChangelogNormalize is applied after WatermarkAssigner to make the watermark to be close to the source. This helps the watermark to be more fine-grained.
However, in some cases, this may shuffle more data, because we may apply all computed column expressions before ChangelogNormalize. As follows, {{a+1}} can be applied after ChangelogNormalize to reduce the shuffles.
{code:sql}
CREATE TABLE src (
id STRING,
a INT,
b AS a + 1,
c STRING,
ts as to_timestamp(c),
PRIMARY KEY (id) NOT ENFORCED,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) WITH (
'connector' = 'values',
'changelog-mode' = 'UA,D'
);
SELECT a, b, c FROM src WHERE a > 1
{code}
{code}
Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
+- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D])
+- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D])
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)