[jira] [Created] (FLINK-19878) Improve watermark ChangelogNormalize for upsertSource

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19878) Improve watermark ChangelogNormalize for upsertSource

Shang Yuanchun (Jira)
Leonard Xu created FLINK-19878:
----------------------------------

             Summary: Improve watermark ChangelogNormalize for upsertSource
                 Key: FLINK-19878
                 URL: https://issues.apache.org/jira/browse/FLINK-19878
             Project: Flink
          Issue Type: Sub-task
          Components: Table SQL / Planner
            Reporter: Leonard Xu


Cutrrently, for a upsertSource like upsert-kafka, the WatermarkAssigner is followed after ChangelogNormalize Node,  it may returns Long.MaxValue as watermark if some parallelism doesn't have data. 

As an improvement, we can move the WatermarkAssigner to be after the SourceCan Node and thus the watermark will produce like general Source.

 
{code:java}
   +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
      +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I,UA,D])
         +- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D])
            +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
               +- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)