[jira] [Created] (FLINK-20369) Improve the digest of TableSourceScan and Sink node

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20369) Improve the digest of TableSourceScan and Sink node

Shang Yuanchun (Jira)
Jark Wu created FLINK-20369:
-------------------------------

             Summary: Improve the digest of TableSourceScan and Sink node
                 Key: FLINK-20369
                 URL: https://issues.apache.org/jira/browse/FLINK-20369
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Jark Wu


Currently,

1. the digest of {{TableSourceScan}} and {{Sink}} doesn't contain the connector information which will be quite useful when debugging.
2. The table name is quite verbose when under default catalog and database, would be better to simplify it to only table name if under default catalog and database.
3. Maybe it's nicer to have changelog mode of source and sink, because it's a meta information of {{DynamicTableSource/Sink#getChangelogMode}}.


{code}
Sink(table=[default_catalog.default_database.sink_kafka_count_city], fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
+- Calc(select=[city_name, CAST(count_customer) AS count_customer, CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
   +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, count_customer, sum_gender, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
      :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
      :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS sum_gender], changelogMode=[I,UA,D])
      :     +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
      :        +- LocalGroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], changelogMode=[I])
      :           +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
      :              +- ChangelogNormalize(key=[customer_id], changelogMode=[I,UB,UA,D])
      :                 +- Exchange(distribution=[hash[customer_id]], changelogMode=[UA,D])
      :                    +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], changelogMode=[UA,D])
      :                       +- TableSourceScan(table=[[default_catalog, default_database, source_customer]], fields=[customer_id, city_id, age, gender, update_time], changelogMode=[UA,D])
      +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
         +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
            +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
               +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], changelogMode=[UA,D])
                  +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)