Leonard Xu created FLINK-19878:
----------------------------------
Summary: Improve watermark ChangelogNormalize for upsertSource
Key: FLINK-19878
URL:
https://issues.apache.org/jira/browse/FLINK-19878 Project: Flink
Issue Type: Sub-task
Components: Table SQL / Planner
Reporter: Leonard Xu
Cutrrently, for a upsertSource like upsert-kafka, the WatermarkAssigner is followed after ChangelogNormalize Node, it may returns Long.MaxValue as watermark if some parallelism doesn't have data.
As an improvement, we can move the WatermarkAssigner to be after the SourceCan Node and thus the watermark will produce like general Source.
{code:java}
+- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I,UA,D])
+- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D])
+- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)