[DISCUSS] Introduce SupportsParallelismReport and SupportsStatisticsReport for Hive and Filesystem

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Introduce SupportsParallelismReport and SupportsStatisticsReport for Hive and Filesystem

Jingsong Li
Hi all,

## SupportsParallelismReport

Now that FLIP-95 [1] is ready, only Hive and Filesystem are still using the
old interfaces.

We are considering migrating to the new interface.

However, one problem is that in the old interface implementation,
connectors infer parallelism by itself instead of a global parallelism
configuration. Hive & filesystem determines the parallelism size according
to the number of files and the size of the file. In this way, large tables
may use thousands of parallelisms, while small tables only have 10
parallelisms, which can minimize the consumption of task scheduling.

This situation is very common in batch computing. For example, in the star
model, a large table needs to be joined with multiple small tables.

So we should give this ability to new table source interfaces. The
interface can be:

/**
 * Enables to give source the ability to report parallelism.
 *
 * <p>After filtering push down and partition push down, the source
can have more information,
 * which can help it infer more effective parallelism.
 */
@Internal
public interface SupportsParallelismReport {

   /**
    * Report parallelism from source or sink. The parallelism of an
operator must be at least 1,
    * or -1 (use system default).
    */
   int reportParallelism();
}


Rejected Alternatives:
- SupportsSplitReport: What is the relationship between this split and the
split of FLIP-27? Do we have to match them one by one? I think they are two
independent things. In fact, the design of FLIP-27, split and parallelism
are not bound one by one.
- SupportsPartitionReport: What is partition? Actually, in table/SQL,
partition is a special concept of table. It should not be mixed with
parallelism.

## SupportsStatisticsReport

As with parallelism, statistics information from source will be more
appropriate and accurate. After filtering push down and partition push
down, the source can have more information, which can help it infer more
effective statistics. However, if we only infer from the planner itself, it
may lead to a big gap between the statistics information and the real
situation.

The interface:

/**
 * Enables to give {@link ScanTableSource} the ability to report table
statistics.
 *
 * <p>Statistics can be inferred from real data in real time,  it is
more accurate than the
 * statistics in the catalog.
 *
 * <p>After filtering push down and partition push down, the source
can have more information,
 * which can help it infer more effective table statistics.
 */
@Internal
public interface SupportsStatisticsReport {

   /**
    * Reports {@link TableStats} from old table stats.
    */
   TableStats reportTableStatistics(TableStats oldStats);
}


When to invoke reported statistics to the planner?
- First of all, this call can be expensive (to view the metadata of the
files), so it can't be called repeatedly.
- We need to call after FilterPushdown, because that's the most accurate
information. We also need to call before CBO (Like JoinReorder and choose
BroadcastJoin or ShuffleJoin), because that's where statistics are used.

Rejected Alternatives:
- Using CatalogTableStatistics: CatalogTableStatistics or TableStats? I
lean to TableStats, because TableStats is the class used by planner,
but CatalogTableStatistics may contains some catalog information which is
not related to planner optimizer.

## Internal or Public

I personally lean to internal, these interfaces are only used Hive and
Filesystem, another way is: SupportsParallelismReport(Internal, I haven't
seen this requirement from outside.) and SupportsStatisticsReport(Public,
maybe Apache Iceberg Flink connector can use it).

What do you think?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces

Best,
Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introduce SupportsParallelismReport and SupportsStatisticsReport for Hive and Filesystem

Benchao Li-2
Hi Jingsong,

Regarding SupportsParallelismReport,
I think the streaming connectors can also benefit from it.
I see some requirements from user ML that they want to control
source/sink's parallelism instead
to set them to global parallelism.
Also, in our compony, we did this too.

Jingsong Li <[hidden email]> 于2020年7月30日周四 上午11:16写道:

> Hi all,
>
> ## SupportsParallelismReport
>
> Now that FLIP-95 [1] is ready, only Hive and Filesystem are still using the
> old interfaces.
>
> We are considering migrating to the new interface.
>
> However, one problem is that in the old interface implementation,
> connectors infer parallelism by itself instead of a global parallelism
> configuration. Hive & filesystem determines the parallelism size according
> to the number of files and the size of the file. In this way, large tables
> may use thousands of parallelisms, while small tables only have 10
> parallelisms, which can minimize the consumption of task scheduling.
>
> This situation is very common in batch computing. For example, in the star
> model, a large table needs to be joined with multiple small tables.
>
> So we should give this ability to new table source interfaces. The
> interface can be:
>
> /**
>  * Enables to give source the ability to report parallelism.
>  *
>  * <p>After filtering push down and partition push down, the source
> can have more information,
>  * which can help it infer more effective parallelism.
>  */
> @Internal
> public interface SupportsParallelismReport {
>
>    /**
>     * Report parallelism from source or sink. The parallelism of an
> operator must be at least 1,
>     * or -1 (use system default).
>     */
>    int reportParallelism();
> }
>
>
> Rejected Alternatives:
> - SupportsSplitReport: What is the relationship between this split and the
> split of FLIP-27? Do we have to match them one by one? I think they are two
> independent things. In fact, the design of FLIP-27, split and parallelism
> are not bound one by one.
> - SupportsPartitionReport: What is partition? Actually, in table/SQL,
> partition is a special concept of table. It should not be mixed with
> parallelism.
>
> ## SupportsStatisticsReport
>
> As with parallelism, statistics information from source will be more
> appropriate and accurate. After filtering push down and partition push
> down, the source can have more information, which can help it infer more
> effective statistics. However, if we only infer from the planner itself, it
> may lead to a big gap between the statistics information and the real
> situation.
>
> The interface:
>
> /**
>  * Enables to give {@link ScanTableSource} the ability to report table
> statistics.
>  *
>  * <p>Statistics can be inferred from real data in real time,  it is
> more accurate than the
>  * statistics in the catalog.
>  *
>  * <p>After filtering push down and partition push down, the source
> can have more information,
>  * which can help it infer more effective table statistics.
>  */
> @Internal
> public interface SupportsStatisticsReport {
>
>    /**
>     * Reports {@link TableStats} from old table stats.
>     */
>    TableStats reportTableStatistics(TableStats oldStats);
> }
>
>
> When to invoke reported statistics to the planner?
> - First of all, this call can be expensive (to view the metadata of the
> files), so it can't be called repeatedly.
> - We need to call after FilterPushdown, because that's the most accurate
> information. We also need to call before CBO (Like JoinReorder and choose
> BroadcastJoin or ShuffleJoin), because that's where statistics are used.
>
> Rejected Alternatives:
> - Using CatalogTableStatistics: CatalogTableStatistics or TableStats? I
> lean to TableStats, because TableStats is the class used by planner,
> but CatalogTableStatistics may contains some catalog information which is
> not related to planner optimizer.
>
> ## Internal or Public
>
> I personally lean to internal, these interfaces are only used Hive and
> Filesystem, another way is: SupportsParallelismReport(Internal, I haven't
> seen this requirement from outside.) and SupportsStatisticsReport(Public,
> maybe Apache Iceberg Flink connector can use it).
>
> What do you think?
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>
> Best,
> Jingsong Lee
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introduce SupportsParallelismReport and SupportsStatisticsReport for Hive and Filesystem

Kurt Young
Hi Jingsong,

Thanks for bringing up this discussion. In general, I'm +1 to enrich the
source ability by
the parallelism and stats reporting, but I'm not sure whether introducing
such "SupportsXXXX"
interface is a good idea. I will share my thoughts separately.

1) Regarding the interface SupportsParallelismReport, first of all, my
feeling is that such a mechanism
is not like other abilities like SupportsProjectionPushDown. Parallelism of
source operator would be
decided anyway, the only difference here is whether it's decided purely by
framework or by table source
itself. So another angle to understand this issue is, we can always assume
a table source has the
ability to determine the parallelism. The table source can choose to set
the parallelism by itself, or delegate
it to the framework.

This might sound like personal taste, but there is another bad case if we
introduce the interface. You
may already know we currently have two major table
sources, LookupTableSource and ScanTableSource.
IIUC it won't make much sense if the user provides a LookupTableSource and
also implements
SupportsParallelismReport.

An alternative solution would be add the method you want directly
to ScanTableSource, and also have
a default implementation returning -1, which means letting framework to
decide the parallelism.

2) Regarding the interface SupportsStatisticsReport, it seems this
interface doesn't work for unbounded
streaming table sources. What kind of implementation do you expect in such
a case? And how does this
interface work with LookupTableSource?
Another question is what the oldStats parameter is used for?

3) Internal or Public. I don't think we should mark them as internal. They
are currently only used by internal
connectors doesn't mean this interface should be internal. I can imagine
there will be lots of Filesystem like
connectors outside the project which need such capability.

Best,
Kurt


On Thu, Jul 30, 2020 at 1:02 PM Benchao Li <[hidden email]> wrote:

> Hi Jingsong,
>
> Regarding SupportsParallelismReport,
> I think the streaming connectors can also benefit from it.
> I see some requirements from user ML that they want to control
> source/sink's parallelism instead
> to set them to global parallelism.
> Also, in our compony, we did this too.
>
> Jingsong Li <[hidden email]> 于2020年7月30日周四 上午11:16写道:
>
> > Hi all,
> >
> > ## SupportsParallelismReport
> >
> > Now that FLIP-95 [1] is ready, only Hive and Filesystem are still using
> the
> > old interfaces.
> >
> > We are considering migrating to the new interface.
> >
> > However, one problem is that in the old interface implementation,
> > connectors infer parallelism by itself instead of a global parallelism
> > configuration. Hive & filesystem determines the parallelism size
> according
> > to the number of files and the size of the file. In this way, large
> tables
> > may use thousands of parallelisms, while small tables only have 10
> > parallelisms, which can minimize the consumption of task scheduling.
> >
> > This situation is very common in batch computing. For example, in the
> star
> > model, a large table needs to be joined with multiple small tables.
> >
> > So we should give this ability to new table source interfaces. The
> > interface can be:
> >
> > /**
> >  * Enables to give source the ability to report parallelism.
> >  *
> >  * <p>After filtering push down and partition push down, the source
> > can have more information,
> >  * which can help it infer more effective parallelism.
> >  */
> > @Internal
> > public interface SupportsParallelismReport {
> >
> >    /**
> >     * Report parallelism from source or sink. The parallelism of an
> > operator must be at least 1,
> >     * or -1 (use system default).
> >     */
> >    int reportParallelism();
> > }
> >
> >
> > Rejected Alternatives:
> > - SupportsSplitReport: What is the relationship between this split and
> the
> > split of FLIP-27? Do we have to match them one by one? I think they are
> two
> > independent things. In fact, the design of FLIP-27, split and parallelism
> > are not bound one by one.
> > - SupportsPartitionReport: What is partition? Actually, in table/SQL,
> > partition is a special concept of table. It should not be mixed with
> > parallelism.
> >
> > ## SupportsStatisticsReport
> >
> > As with parallelism, statistics information from source will be more
> > appropriate and accurate. After filtering push down and partition push
> > down, the source can have more information, which can help it infer more
> > effective statistics. However, if we only infer from the planner itself,
> it
> > may lead to a big gap between the statistics information and the real
> > situation.
> >
> > The interface:
> >
> > /**
> >  * Enables to give {@link ScanTableSource} the ability to report table
> > statistics.
> >  *
> >  * <p>Statistics can be inferred from real data in real time,  it is
> > more accurate than the
> >  * statistics in the catalog.
> >  *
> >  * <p>After filtering push down and partition push down, the source
> > can have more information,
> >  * which can help it infer more effective table statistics.
> >  */
> > @Internal
> > public interface SupportsStatisticsReport {
> >
> >    /**
> >     * Reports {@link TableStats} from old table stats.
> >     */
> >    TableStats reportTableStatistics(TableStats oldStats);
> > }
> >
> >
> > When to invoke reported statistics to the planner?
> > - First of all, this call can be expensive (to view the metadata of the
> > files), so it can't be called repeatedly.
> > - We need to call after FilterPushdown, because that's the most accurate
> > information. We also need to call before CBO (Like JoinReorder and choose
> > BroadcastJoin or ShuffleJoin), because that's where statistics are used.
> >
> > Rejected Alternatives:
> > - Using CatalogTableStatistics: CatalogTableStatistics or TableStats? I
> > lean to TableStats, because TableStats is the class used by planner,
> > but CatalogTableStatistics may contains some catalog information which is
> > not related to planner optimizer.
> >
> > ## Internal or Public
> >
> > I personally lean to internal, these interfaces are only used Hive and
> > Filesystem, another way is: SupportsParallelismReport(Internal, I haven't
> > seen this requirement from outside.) and SupportsStatisticsReport(Public,
> > maybe Apache Iceberg Flink connector can use it).
> >
> > What do you think?
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> >
> > Best,
> > Jingsong Lee
> >
>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introduce SupportsParallelismReport and SupportsStatisticsReport for Hive and Filesystem

godfreyhe
Thanks Jingsong for bringing up this discussion,
 and thanks Kurt for the detailed thoughts.

First of all, I also think it's a very useful feature to expose more
ability for table source.

1) If we want to support [1], it's seem that SupportsParallelismReport
does not meet the requirement: If there are multiple Transformations in
source op,
and they require different parallelism.

2) regarding to "SupportsXXX" for ScanTableSource or LookupTableSource,
Currently, we also do not distinguish them for the existing "SupportsXXX".
Such as a LookupTableSource should not extend from SupportsWatermarkPushDown
and SupportsComputedColumnPushDown.
A DynamicTableSource sub-class will extend from "SupportsXXX" only if it
has the capability,
So the unbounded table source should not extend from
SupportsStatisticsReport,
or just return unknown for unbounded if a table source can work for both
bounded and unbounded.

I think SupportsStatisticsReport is a supplement to catalog statistics,
that means
only catalog statistic is unknown, SupportsStatisticsReport works.

3)  +1 to make them as public.

[1] https://issues.apache.org/jira/browse/FLINK-18674

Best,
Godfrey



Kurt Young <[hidden email]> 于2020年7月30日周四 下午4:01写道:

> Hi Jingsong,
>
> Thanks for bringing up this discussion. In general, I'm +1 to enrich the
> source ability by
> the parallelism and stats reporting, but I'm not sure whether introducing
> such "SupportsXXXX"
> interface is a good idea. I will share my thoughts separately.
>
> 1) Regarding the interface SupportsParallelismReport, first of all, my
> feeling is that such a mechanism
> is not like other abilities like SupportsProjectionPushDown. Parallelism of
> source operator would be
> decided anyway, the only difference here is whether it's decided purely by
> framework or by table source
> itself. So another angle to understand this issue is, we can always assume
> a table source has the
> ability to determine the parallelism. The table source can choose to set
> the parallelism by itself, or delegate
> it to the framework.
>
> This might sound like personal taste, but there is another bad case if we
> introduce the interface. You
> may already know we currently have two major table
> sources, LookupTableSource and ScanTableSource.
> IIUC it won't make much sense if the user provides a LookupTableSource and
> also implements
> SupportsParallelismReport.
>
> An alternative solution would be add the method you want directly
> to ScanTableSource, and also have
> a default implementation returning -1, which means letting framework to
> decide the parallelism.
>
> 2) Regarding the interface SupportsStatisticsReport, it seems this
> interface doesn't work for unbounded
> streaming table sources. What kind of implementation do you expect in such
> a case? And how does this
> interface work with LookupTableSource?
> Another question is what the oldStats parameter is used for?
>
> 3) Internal or Public. I don't think we should mark them as internal. They
> are currently only used by internal
> connectors doesn't mean this interface should be internal. I can imagine
> there will be lots of Filesystem like
> connectors outside the project which need such capability.
>
> Best,
> Kurt
>
>
> On Thu, Jul 30, 2020 at 1:02 PM Benchao Li <[hidden email]> wrote:
>
> > Hi Jingsong,
> >
> > Regarding SupportsParallelismReport,
> > I think the streaming connectors can also benefit from it.
> > I see some requirements from user ML that they want to control
> > source/sink's parallelism instead
> > to set them to global parallelism.
> > Also, in our compony, we did this too.
> >
> > Jingsong Li <[hidden email]> 于2020年7月30日周四 上午11:16写道:
> >
> > > Hi all,
> > >
> > > ## SupportsParallelismReport
> > >
> > > Now that FLIP-95 [1] is ready, only Hive and Filesystem are still using
> > the
> > > old interfaces.
> > >
> > > We are considering migrating to the new interface.
> > >
> > > However, one problem is that in the old interface implementation,
> > > connectors infer parallelism by itself instead of a global parallelism
> > > configuration. Hive & filesystem determines the parallelism size
> > according
> > > to the number of files and the size of the file. In this way, large
> > tables
> > > may use thousands of parallelisms, while small tables only have 10
> > > parallelisms, which can minimize the consumption of task scheduling.
> > >
> > > This situation is very common in batch computing. For example, in the
> > star
> > > model, a large table needs to be joined with multiple small tables.
> > >
> > > So we should give this ability to new table source interfaces. The
> > > interface can be:
> > >
> > > /**
> > >  * Enables to give source the ability to report parallelism.
> > >  *
> > >  * <p>After filtering push down and partition push down, the source
> > > can have more information,
> > >  * which can help it infer more effective parallelism.
> > >  */
> > > @Internal
> > > public interface SupportsParallelismReport {
> > >
> > >    /**
> > >     * Report parallelism from source or sink. The parallelism of an
> > > operator must be at least 1,
> > >     * or -1 (use system default).
> > >     */
> > >    int reportParallelism();
> > > }
> > >
> > >
> > > Rejected Alternatives:
> > > - SupportsSplitReport: What is the relationship between this split and
> > the
> > > split of FLIP-27? Do we have to match them one by one? I think they are
> > two
> > > independent things. In fact, the design of FLIP-27, split and
> parallelism
> > > are not bound one by one.
> > > - SupportsPartitionReport: What is partition? Actually, in table/SQL,
> > > partition is a special concept of table. It should not be mixed with
> > > parallelism.
> > >
> > > ## SupportsStatisticsReport
> > >
> > > As with parallelism, statistics information from source will be more
> > > appropriate and accurate. After filtering push down and partition push
> > > down, the source can have more information, which can help it infer
> more
> > > effective statistics. However, if we only infer from the planner
> itself,
> > it
> > > may lead to a big gap between the statistics information and the real
> > > situation.
> > >
> > > The interface:
> > >
> > > /**
> > >  * Enables to give {@link ScanTableSource} the ability to report table
> > > statistics.
> > >  *
> > >  * <p>Statistics can be inferred from real data in real time,  it is
> > > more accurate than the
> > >  * statistics in the catalog.
> > >  *
> > >  * <p>After filtering push down and partition push down, the source
> > > can have more information,
> > >  * which can help it infer more effective table statistics.
> > >  */
> > > @Internal
> > > public interface SupportsStatisticsReport {
> > >
> > >    /**
> > >     * Reports {@link TableStats} from old table stats.
> > >     */
> > >    TableStats reportTableStatistics(TableStats oldStats);
> > > }
> > >
> > >
> > > When to invoke reported statistics to the planner?
> > > - First of all, this call can be expensive (to view the metadata of the
> > > files), so it can't be called repeatedly.
> > > - We need to call after FilterPushdown, because that's the most
> accurate
> > > information. We also need to call before CBO (Like JoinReorder and
> choose
> > > BroadcastJoin or ShuffleJoin), because that's where statistics are
> used.
> > >
> > > Rejected Alternatives:
> > > - Using CatalogTableStatistics: CatalogTableStatistics or TableStats? I
> > > lean to TableStats, because TableStats is the class used by planner,
> > > but CatalogTableStatistics may contains some catalog information which
> is
> > > not related to planner optimizer.
> > >
> > > ## Internal or Public
> > >
> > > I personally lean to internal, these interfaces are only used Hive and
> > > Filesystem, another way is: SupportsParallelismReport(Internal, I
> haven't
> > > seen this requirement from outside.) and
> SupportsStatisticsReport(Public,
> > > maybe Apache Iceberg Flink connector can use it).
> > >
> > > What do you think?
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> > >
> > > Best,
> > > Jingsong Lee
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introduce SupportsParallelismReport and SupportsStatisticsReport for Hive and Filesystem

Jingsong Li
Hi, thanks for your responses.

To Benchao:

Glad to see your works and requirements, they should be Public.

To Kurt:

1.Regarding "SupportsXXX" for ScanTableSource or LookupTableSource
or DynamicTableSink, I don't think a "SupportsXXX" must work with all these
three types. As Godfrey said, Such as a LookupTableSource should not extend
from SupportsWatermarkPushDown and SupportsComputedColumnPushDown. We just
try our best to make all combinations work, like
"SupportsParallelismReport", it can work with both ScanTableSource
and DynamicTableSink.

About adding the method "reportParallelism" we want directly to
ScanTableSource and DynamicTableSink, I think maybe most of sources/sinks
do not want to see this method, provides a "SupportsXXX" aim to give
connector developer a option selection.

2.Regarding SupportsStatisticsReport doesn't work for unbounded streaming
table sources, yes, it is, the statistics (Including catalog statistics)
are not related to stream tables, but I think, in future, we can create
more useful statistics information for streaming tables.

3."oldStats" in SupportsStatisticsReport, "oldStats" should be named to
"catalogStats", source just try its best to get more useful and accurate
statistic information, but just like Godfrey said, it is a supplement to
catalog statistics, it can just supplement missing or inaccurate
information in the catalog.

4.Internal or Public, I am glad to see your requirements, I am OK with
Public.

To Godfrey:

Regarding If there are multiple Transformations in source op, and they
require different parallelism. In this case, it should be left to the
source to set the parallelism. So, these should be two things that are
orthogonal. Users who do not use multi Transformations still need to set
parallelism.

Best,
Jingsong

On Thu, Jul 30, 2020 at 8:31 PM godfrey he <[hidden email]> wrote:

> Thanks Jingsong for bringing up this discussion,
>  and thanks Kurt for the detailed thoughts.
>
> First of all, I also think it's a very useful feature to expose more
> ability for table source.
>
> 1) If we want to support [1], it's seem that SupportsParallelismReport
> does not meet the requirement: If there are multiple Transformations in
> source op,
> and they require different parallelism.
>
> 2) regarding to "SupportsXXX" for ScanTableSource or LookupTableSource,
> Currently, we also do not distinguish them for the existing "SupportsXXX".
> Such as a LookupTableSource should not extend from
> SupportsWatermarkPushDown
> and SupportsComputedColumnPushDown.
> A DynamicTableSource sub-class will extend from "SupportsXXX" only if it
> has the capability,
> So the unbounded table source should not extend from
> SupportsStatisticsReport,
> or just return unknown for unbounded if a table source can work for both
> bounded and unbounded.
>
> I think SupportsStatisticsReport is a supplement to catalog statistics,
> that means
> only catalog statistic is unknown, SupportsStatisticsReport works.
>
> 3)  +1 to make them as public.
>
> [1] https://issues.apache.org/jira/browse/FLINK-18674
>
> Best,
> Godfrey
>
>
>
> Kurt Young <[hidden email]> 于2020年7月30日周四 下午4:01写道:
>
> > Hi Jingsong,
> >
> > Thanks for bringing up this discussion. In general, I'm +1 to enrich the
> > source ability by
> > the parallelism and stats reporting, but I'm not sure whether introducing
> > such "SupportsXXXX"
> > interface is a good idea. I will share my thoughts separately.
> >
> > 1) Regarding the interface SupportsParallelismReport, first of all, my
> > feeling is that such a mechanism
> > is not like other abilities like SupportsProjectionPushDown. Parallelism
> of
> > source operator would be
> > decided anyway, the only difference here is whether it's decided purely
> by
> > framework or by table source
> > itself. So another angle to understand this issue is, we can always
> assume
> > a table source has the
> > ability to determine the parallelism. The table source can choose to set
> > the parallelism by itself, or delegate
> > it to the framework.
> >
> > This might sound like personal taste, but there is another bad case if we
> > introduce the interface. You
> > may already know we currently have two major table
> > sources, LookupTableSource and ScanTableSource.
> > IIUC it won't make much sense if the user provides a LookupTableSource
> and
> > also implements
> > SupportsParallelismReport.
> >
> > An alternative solution would be add the method you want directly
> > to ScanTableSource, and also have
> > a default implementation returning -1, which means letting framework to
> > decide the parallelism.
> >
> > 2) Regarding the interface SupportsStatisticsReport, it seems this
> > interface doesn't work for unbounded
> > streaming table sources. What kind of implementation do you expect in
> such
> > a case? And how does this
> > interface work with LookupTableSource?
> > Another question is what the oldStats parameter is used for?
> >
> > 3) Internal or Public. I don't think we should mark them as internal.
> They
> > are currently only used by internal
> > connectors doesn't mean this interface should be internal. I can imagine
> > there will be lots of Filesystem like
> > connectors outside the project which need such capability.
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Jul 30, 2020 at 1:02 PM Benchao Li <[hidden email]> wrote:
> >
> > > Hi Jingsong,
> > >
> > > Regarding SupportsParallelismReport,
> > > I think the streaming connectors can also benefit from it.
> > > I see some requirements from user ML that they want to control
> > > source/sink's parallelism instead
> > > to set them to global parallelism.
> > > Also, in our compony, we did this too.
> > >
> > > Jingsong Li <[hidden email]> 于2020年7月30日周四 上午11:16写道:
> > >
> > > > Hi all,
> > > >
> > > > ## SupportsParallelismReport
> > > >
> > > > Now that FLIP-95 [1] is ready, only Hive and Filesystem are still
> using
> > > the
> > > > old interfaces.
> > > >
> > > > We are considering migrating to the new interface.
> > > >
> > > > However, one problem is that in the old interface implementation,
> > > > connectors infer parallelism by itself instead of a global
> parallelism
> > > > configuration. Hive & filesystem determines the parallelism size
> > > according
> > > > to the number of files and the size of the file. In this way, large
> > > tables
> > > > may use thousands of parallelisms, while small tables only have 10
> > > > parallelisms, which can minimize the consumption of task scheduling.
> > > >
> > > > This situation is very common in batch computing. For example, in the
> > > star
> > > > model, a large table needs to be joined with multiple small tables.
> > > >
> > > > So we should give this ability to new table source interfaces. The
> > > > interface can be:
> > > >
> > > > /**
> > > >  * Enables to give source the ability to report parallelism.
> > > >  *
> > > >  * <p>After filtering push down and partition push down, the source
> > > > can have more information,
> > > >  * which can help it infer more effective parallelism.
> > > >  */
> > > > @Internal
> > > > public interface SupportsParallelismReport {
> > > >
> > > >    /**
> > > >     * Report parallelism from source or sink. The parallelism of an
> > > > operator must be at least 1,
> > > >     * or -1 (use system default).
> > > >     */
> > > >    int reportParallelism();
> > > > }
> > > >
> > > >
> > > > Rejected Alternatives:
> > > > - SupportsSplitReport: What is the relationship between this split
> and
> > > the
> > > > split of FLIP-27? Do we have to match them one by one? I think they
> are
> > > two
> > > > independent things. In fact, the design of FLIP-27, split and
> > parallelism
> > > > are not bound one by one.
> > > > - SupportsPartitionReport: What is partition? Actually, in table/SQL,
> > > > partition is a special concept of table. It should not be mixed with
> > > > parallelism.
> > > >
> > > > ## SupportsStatisticsReport
> > > >
> > > > As with parallelism, statistics information from source will be more
> > > > appropriate and accurate. After filtering push down and partition
> push
> > > > down, the source can have more information, which can help it infer
> > more
> > > > effective statistics. However, if we only infer from the planner
> > itself,
> > > it
> > > > may lead to a big gap between the statistics information and the real
> > > > situation.
> > > >
> > > > The interface:
> > > >
> > > > /**
> > > >  * Enables to give {@link ScanTableSource} the ability to report
> table
> > > > statistics.
> > > >  *
> > > >  * <p>Statistics can be inferred from real data in real time,  it is
> > > > more accurate than the
> > > >  * statistics in the catalog.
> > > >  *
> > > >  * <p>After filtering push down and partition push down, the source
> > > > can have more information,
> > > >  * which can help it infer more effective table statistics.
> > > >  */
> > > > @Internal
> > > > public interface SupportsStatisticsReport {
> > > >
> > > >    /**
> > > >     * Reports {@link TableStats} from old table stats.
> > > >     */
> > > >    TableStats reportTableStatistics(TableStats oldStats);
> > > > }
> > > >
> > > >
> > > > When to invoke reported statistics to the planner?
> > > > - First of all, this call can be expensive (to view the metadata of
> the
> > > > files), so it can't be called repeatedly.
> > > > - We need to call after FilterPushdown, because that's the most
> > accurate
> > > > information. We also need to call before CBO (Like JoinReorder and
> > choose
> > > > BroadcastJoin or ShuffleJoin), because that's where statistics are
> > used.
> > > >
> > > > Rejected Alternatives:
> > > > - Using CatalogTableStatistics: CatalogTableStatistics or
> TableStats? I
> > > > lean to TableStats, because TableStats is the class used by planner,
> > > > but CatalogTableStatistics may contains some catalog information
> which
> > is
> > > > not related to planner optimizer.
> > > >
> > > > ## Internal or Public
> > > >
> > > > I personally lean to internal, these interfaces are only used Hive
> and
> > > > Filesystem, another way is: SupportsParallelismReport(Internal, I
> > haven't
> > > > seen this requirement from outside.) and
> > SupportsStatisticsReport(Public,
> > > > maybe Apache Iceberg Flink connector can use it).
> > > >
> > > > What do you think?
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introduce SupportsParallelismReport and SupportsStatisticsReport for Hive and Filesystem

Kurt Young
1. Even if there are some "Supports" interfaces that are not orthogonal
with ScanTableSource and LookupTableSource,
it doesn't mean we should encourage such usage. Such concept conflicts will
accumulate to larger issues which will
hurt us in the future.

2. Regarding to SupportsStatisticsReport, I think the interface is a bit
fuzzy. From the interface name, I was expecting that
this source will try to gather and report statistics of their own. But it
also receives some catalog statistics, what is this?
Why does the table source need to *report* statistics when there already
exists some statistics from the catalog? Would this
catalog statistics always exist?

3.
> Regarding If there are multiple Transformations in source op, and they
>r equire different parallelism. In this case, it should be left to the
> source to set the parallelism
This sounds like a contradiction to the interface you want to introduce.
I'm more confused, do you want the framework to take care
the parallelism setting for the source operator, or do you want to let the
source operator set the parallelism?

Best,
Kurt


On Fri, Jul 31, 2020 at 1:43 PM Jingsong Li <[hidden email]> wrote:

> Hi, thanks for your responses.
>
> To Benchao:
>
> Glad to see your works and requirements, they should be Public.
>
> To Kurt:
>
> 1.Regarding "SupportsXXX" for ScanTableSource or LookupTableSource
> or DynamicTableSink, I don't think a "SupportsXXX" must work with all these
> three types. As Godfrey said, Such as a LookupTableSource should not extend
> from SupportsWatermarkPushDown and SupportsComputedColumnPushDown. We just
> try our best to make all combinations work, like
> "SupportsParallelismReport", it can work with both ScanTableSource
> and DynamicTableSink.
>
> About adding the method "reportParallelism" we want directly to
> ScanTableSource and DynamicTableSink, I think maybe most of sources/sinks
> do not want to see this method, provides a "SupportsXXX" aim to give
> connector developer a option selection.
>
> 2.Regarding SupportsStatisticsReport doesn't work for unbounded streaming
> table sources, yes, it is, the statistics (Including catalog statistics)
> are not related to stream tables, but I think, in future, we can create
> more useful statistics information for streaming tables.
>
> 3."oldStats" in SupportsStatisticsReport, "oldStats" should be named to
> "catalogStats", source just try its best to get more useful and accurate
> statistic information, but just like Godfrey said, it is a supplement to
> catalog statistics, it can just supplement missing or inaccurate
> information in the catalog.
>
> 4.Internal or Public, I am glad to see your requirements, I am OK with
> Public.
>
> To Godfrey:
>
> Regarding If there are multiple Transformations in source op, and they
> require different parallelism. In this case, it should be left to the
> source to set the parallelism. So, these should be two things that are
> orthogonal. Users who do not use multi Transformations still need to set
> parallelism.
>
> Best,
> Jingsong
>
> On Thu, Jul 30, 2020 at 8:31 PM godfrey he <[hidden email]> wrote:
>
> > Thanks Jingsong for bringing up this discussion,
> >  and thanks Kurt for the detailed thoughts.
> >
> > First of all, I also think it's a very useful feature to expose more
> > ability for table source.
> >
> > 1) If we want to support [1], it's seem that SupportsParallelismReport
> > does not meet the requirement: If there are multiple Transformations in
> > source op,
> > and they require different parallelism.
> >
> > 2) regarding to "SupportsXXX" for ScanTableSource or LookupTableSource,
> > Currently, we also do not distinguish them for the existing
> "SupportsXXX".
> > Such as a LookupTableSource should not extend from
> > SupportsWatermarkPushDown
> > and SupportsComputedColumnPushDown.
> > A DynamicTableSource sub-class will extend from "SupportsXXX" only if it
> > has the capability,
> > So the unbounded table source should not extend from
> > SupportsStatisticsReport,
> > or just return unknown for unbounded if a table source can work for both
> > bounded and unbounded.
> >
> > I think SupportsStatisticsReport is a supplement to catalog statistics,
> > that means
> > only catalog statistic is unknown, SupportsStatisticsReport works.
> >
> > 3)  +1 to make them as public.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-18674
> >
> > Best,
> > Godfrey
> >
> >
> >
> > Kurt Young <[hidden email]> 于2020年7月30日周四 下午4:01写道:
> >
> > > Hi Jingsong,
> > >
> > > Thanks for bringing up this discussion. In general, I'm +1 to enrich
> the
> > > source ability by
> > > the parallelism and stats reporting, but I'm not sure whether
> introducing
> > > such "SupportsXXXX"
> > > interface is a good idea. I will share my thoughts separately.
> > >
> > > 1) Regarding the interface SupportsParallelismReport, first of all, my
> > > feeling is that such a mechanism
> > > is not like other abilities like SupportsProjectionPushDown.
> Parallelism
> > of
> > > source operator would be
> > > decided anyway, the only difference here is whether it's decided purely
> > by
> > > framework or by table source
> > > itself. So another angle to understand this issue is, we can always
> > assume
> > > a table source has the
> > > ability to determine the parallelism. The table source can choose to
> set
> > > the parallelism by itself, or delegate
> > > it to the framework.
> > >
> > > This might sound like personal taste, but there is another bad case if
> we
> > > introduce the interface. You
> > > may already know we currently have two major table
> > > sources, LookupTableSource and ScanTableSource.
> > > IIUC it won't make much sense if the user provides a LookupTableSource
> > and
> > > also implements
> > > SupportsParallelismReport.
> > >
> > > An alternative solution would be add the method you want directly
> > > to ScanTableSource, and also have
> > > a default implementation returning -1, which means letting framework to
> > > decide the parallelism.
> > >
> > > 2) Regarding the interface SupportsStatisticsReport, it seems this
> > > interface doesn't work for unbounded
> > > streaming table sources. What kind of implementation do you expect in
> > such
> > > a case? And how does this
> > > interface work with LookupTableSource?
> > > Another question is what the oldStats parameter is used for?
> > >
> > > 3) Internal or Public. I don't think we should mark them as internal.
> > They
> > > are currently only used by internal
> > > connectors doesn't mean this interface should be internal. I can
> imagine
> > > there will be lots of Filesystem like
> > > connectors outside the project which need such capability.
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Thu, Jul 30, 2020 at 1:02 PM Benchao Li <[hidden email]>
> wrote:
> > >
> > > > Hi Jingsong,
> > > >
> > > > Regarding SupportsParallelismReport,
> > > > I think the streaming connectors can also benefit from it.
> > > > I see some requirements from user ML that they want to control
> > > > source/sink's parallelism instead
> > > > to set them to global parallelism.
> > > > Also, in our compony, we did this too.
> > > >
> > > > Jingsong Li <[hidden email]> 于2020年7月30日周四 上午11:16写道:
> > > >
> > > > > Hi all,
> > > > >
> > > > > ## SupportsParallelismReport
> > > > >
> > > > > Now that FLIP-95 [1] is ready, only Hive and Filesystem are still
> > using
> > > > the
> > > > > old interfaces.
> > > > >
> > > > > We are considering migrating to the new interface.
> > > > >
> > > > > However, one problem is that in the old interface implementation,
> > > > > connectors infer parallelism by itself instead of a global
> > parallelism
> > > > > configuration. Hive & filesystem determines the parallelism size
> > > > according
> > > > > to the number of files and the size of the file. In this way, large
> > > > tables
> > > > > may use thousands of parallelisms, while small tables only have 10
> > > > > parallelisms, which can minimize the consumption of task
> scheduling.
> > > > >
> > > > > This situation is very common in batch computing. For example, in
> the
> > > > star
> > > > > model, a large table needs to be joined with multiple small tables.
> > > > >
> > > > > So we should give this ability to new table source interfaces. The
> > > > > interface can be:
> > > > >
> > > > > /**
> > > > >  * Enables to give source the ability to report parallelism.
> > > > >  *
> > > > >  * <p>After filtering push down and partition push down, the source
> > > > > can have more information,
> > > > >  * which can help it infer more effective parallelism.
> > > > >  */
> > > > > @Internal
> > > > > public interface SupportsParallelismReport {
> > > > >
> > > > >    /**
> > > > >     * Report parallelism from source or sink. The parallelism of an
> > > > > operator must be at least 1,
> > > > >     * or -1 (use system default).
> > > > >     */
> > > > >    int reportParallelism();
> > > > > }
> > > > >
> > > > >
> > > > > Rejected Alternatives:
> > > > > - SupportsSplitReport: What is the relationship between this split
> > and
> > > > the
> > > > > split of FLIP-27? Do we have to match them one by one? I think they
> > are
> > > > two
> > > > > independent things. In fact, the design of FLIP-27, split and
> > > parallelism
> > > > > are not bound one by one.
> > > > > - SupportsPartitionReport: What is partition? Actually, in
> table/SQL,
> > > > > partition is a special concept of table. It should not be mixed
> with
> > > > > parallelism.
> > > > >
> > > > > ## SupportsStatisticsReport
> > > > >
> > > > > As with parallelism, statistics information from source will be
> more
> > > > > appropriate and accurate. After filtering push down and partition
> > push
> > > > > down, the source can have more information, which can help it infer
> > > more
> > > > > effective statistics. However, if we only infer from the planner
> > > itself,
> > > > it
> > > > > may lead to a big gap between the statistics information and the
> real
> > > > > situation.
> > > > >
> > > > > The interface:
> > > > >
> > > > > /**
> > > > >  * Enables to give {@link ScanTableSource} the ability to report
> > table
> > > > > statistics.
> > > > >  *
> > > > >  * <p>Statistics can be inferred from real data in real time,  it
> is
> > > > > more accurate than the
> > > > >  * statistics in the catalog.
> > > > >  *
> > > > >  * <p>After filtering push down and partition push down, the source
> > > > > can have more information,
> > > > >  * which can help it infer more effective table statistics.
> > > > >  */
> > > > > @Internal
> > > > > public interface SupportsStatisticsReport {
> > > > >
> > > > >    /**
> > > > >     * Reports {@link TableStats} from old table stats.
> > > > >     */
> > > > >    TableStats reportTableStatistics(TableStats oldStats);
> > > > > }
> > > > >
> > > > >
> > > > > When to invoke reported statistics to the planner?
> > > > > - First of all, this call can be expensive (to view the metadata of
> > the
> > > > > files), so it can't be called repeatedly.
> > > > > - We need to call after FilterPushdown, because that's the most
> > > accurate
> > > > > information. We also need to call before CBO (Like JoinReorder and
> > > choose
> > > > > BroadcastJoin or ShuffleJoin), because that's where statistics are
> > > used.
> > > > >
> > > > > Rejected Alternatives:
> > > > > - Using CatalogTableStatistics: CatalogTableStatistics or
> > TableStats? I
> > > > > lean to TableStats, because TableStats is the class used by
> planner,
> > > > > but CatalogTableStatistics may contains some catalog information
> > which
> > > is
> > > > > not related to planner optimizer.
> > > > >
> > > > > ## Internal or Public
> > > > >
> > > > > I personally lean to internal, these interfaces are only used Hive
> > and
> > > > > Filesystem, another way is: SupportsParallelismReport(Internal, I
> > > haven't
> > > > > seen this requirement from outside.) and
> > > SupportsStatisticsReport(Public,
> > > > > maybe Apache Iceberg Flink connector can use it).
> > > > >
> > > > > What do you think?
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> > > > >
> > > > > Best,
> > > > > Jingsong Lee
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introduce SupportsParallelismReport and SupportsStatisticsReport for Hive and Filesystem

Jingsong Li
Hi Kurt, thanks for response, I got your points, they are reasonable, but
let me explain more:

## Parallelism setting for source and sink

Except `SupportsParallelismReport`, the options are:

1.Support multi transformations
If we support multi transformations, yeah, DataStream back again.
Personally I am slightly -1 for this, it makes many functionality not
orthogonal, and makes future optimizers development more difficult. Just
like @Jark Wu <[hidden email]> said, it will be difficult to support
state compatibility, parallelism configure, message ordering guarantee in
the future.
If we really want to support multi transformations, yes, there is no need
to support `SupportsParallelismReport`, and let's use multi transformations
happily, and maybe, The presence of other interfaces
(Source,SourceFunction,InputFormat) is getting weaker and weaker.

2.`getParallelism` in ScanSource and DynamicSink.
What are the reasons why we introduced "SupportXX"? All "SupportXX"
interfaces may be placed directly on ScanSource. "Support XX" interface can
make ScanSource cleaner. They are optional inheritance, which can make
users feel more friendly. So I -1 for `getParallelism` in ScanSource and
DynamicSink. I think the parallelism setting is not so important to let
every user see.

Can `LookupTableSource` have parallelism?
I think yes, actually, lookup is an action, and this action must occur in
tasks, so the parallelism of tasks can be configured. And sometimes
the parallelism is about lookup performance and is worth configuring.

## SupportsStatisticsReport and catalog statistics

One of the scenarios I envisioned was:
HiveSource can easily get the file size, and it can only get the file size
at a small cost, but can not get other statistics.
In reality, there are many cases that the statistical information from the
catalog is inaccurate. What I want to do is: the statistics of the catalog
can be judged by the actual file size:
- If the difference is too big, discard catalog statistics.
- If the difference is not big, continue to use catalog statistics.

The actual collected statistical information can have some positive
interaction with the catalog statistical information, which can make the
final statistical information more perfect and accurate.

If you don't think this is a reasonable requirement, I am OK to remove the
catalog statistics argument.

Best,
Jingsong

On Fri, Jul 31, 2020 at 3:32 PM Kurt Young <[hidden email]> wrote:

> 1. Even if there are some "Supports" interfaces that are not orthogonal
> with ScanTableSource and LookupTableSource,
> it doesn't mean we should encourage such usage. Such concept conflicts will
> accumulate to larger issues which will
> hurt us in the future.
>
> 2. Regarding to SupportsStatisticsReport, I think the interface is a bit
> fuzzy. From the interface name, I was expecting that
> this source will try to gather and report statistics of their own. But it
> also receives some catalog statistics, what is this?
> Why does the table source need to *report* statistics when there already
> exists some statistics from the catalog? Would this
> catalog statistics always exist?
>
> 3.
> > Regarding If there are multiple Transformations in source op, and they
> >r equire different parallelism. In this case, it should be left to the
> > source to set the parallelism
> This sounds like a contradiction to the interface you want to introduce.
> I'm more confused, do you want the framework to take care
> the parallelism setting for the source operator, or do you want to let the
> source operator set the parallelism?
>
> Best,
> Kurt
>
>
> On Fri, Jul 31, 2020 at 1:43 PM Jingsong Li <[hidden email]>
> wrote:
>
> > Hi, thanks for your responses.
> >
> > To Benchao:
> >
> > Glad to see your works and requirements, they should be Public.
> >
> > To Kurt:
> >
> > 1.Regarding "SupportsXXX" for ScanTableSource or LookupTableSource
> > or DynamicTableSink, I don't think a "SupportsXXX" must work with all
> these
> > three types. As Godfrey said, Such as a LookupTableSource should not
> extend
> > from SupportsWatermarkPushDown and SupportsComputedColumnPushDown. We
> just
> > try our best to make all combinations work, like
> > "SupportsParallelismReport", it can work with both ScanTableSource
> > and DynamicTableSink.
> >
> > About adding the method "reportParallelism" we want directly to
> > ScanTableSource and DynamicTableSink, I think maybe most of sources/sinks
> > do not want to see this method, provides a "SupportsXXX" aim to give
> > connector developer a option selection.
> >
> > 2.Regarding SupportsStatisticsReport doesn't work for unbounded streaming
> > table sources, yes, it is, the statistics (Including catalog statistics)
> > are not related to stream tables, but I think, in future, we can create
> > more useful statistics information for streaming tables.
> >
> > 3."oldStats" in SupportsStatisticsReport, "oldStats" should be named to
> > "catalogStats", source just try its best to get more useful and accurate
> > statistic information, but just like Godfrey said, it is a supplement to
> > catalog statistics, it can just supplement missing or inaccurate
> > information in the catalog.
> >
> > 4.Internal or Public, I am glad to see your requirements, I am OK with
> > Public.
> >
> > To Godfrey:
> >
> > Regarding If there are multiple Transformations in source op, and they
> > require different parallelism. In this case, it should be left to the
> > source to set the parallelism. So, these should be two things that are
> > orthogonal. Users who do not use multi Transformations still need to set
> > parallelism.
> >
> > Best,
> > Jingsong
> >
> > On Thu, Jul 30, 2020 at 8:31 PM godfrey he <[hidden email]> wrote:
> >
> > > Thanks Jingsong for bringing up this discussion,
> > >  and thanks Kurt for the detailed thoughts.
> > >
> > > First of all, I also think it's a very useful feature to expose more
> > > ability for table source.
> > >
> > > 1) If we want to support [1], it's seem that SupportsParallelismReport
> > > does not meet the requirement: If there are multiple Transformations in
> > > source op,
> > > and they require different parallelism.
> > >
> > > 2) regarding to "SupportsXXX" for ScanTableSource or LookupTableSource,
> > > Currently, we also do not distinguish them for the existing
> > "SupportsXXX".
> > > Such as a LookupTableSource should not extend from
> > > SupportsWatermarkPushDown
> > > and SupportsComputedColumnPushDown.
> > > A DynamicTableSource sub-class will extend from "SupportsXXX" only if
> it
> > > has the capability,
> > > So the unbounded table source should not extend from
> > > SupportsStatisticsReport,
> > > or just return unknown for unbounded if a table source can work for
> both
> > > bounded and unbounded.
> > >
> > > I think SupportsStatisticsReport is a supplement to catalog statistics,
> > > that means
> > > only catalog statistic is unknown, SupportsStatisticsReport works.
> > >
> > > 3)  +1 to make them as public.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-18674
> > >
> > > Best,
> > > Godfrey
> > >
> > >
> > >
> > > Kurt Young <[hidden email]> 于2020年7月30日周四 下午4:01写道:
> > >
> > > > Hi Jingsong,
> > > >
> > > > Thanks for bringing up this discussion. In general, I'm +1 to enrich
> > the
> > > > source ability by
> > > > the parallelism and stats reporting, but I'm not sure whether
> > introducing
> > > > such "SupportsXXXX"
> > > > interface is a good idea. I will share my thoughts separately.
> > > >
> > > > 1) Regarding the interface SupportsParallelismReport, first of all,
> my
> > > > feeling is that such a mechanism
> > > > is not like other abilities like SupportsProjectionPushDown.
> > Parallelism
> > > of
> > > > source operator would be
> > > > decided anyway, the only difference here is whether it's decided
> purely
> > > by
> > > > framework or by table source
> > > > itself. So another angle to understand this issue is, we can always
> > > assume
> > > > a table source has the
> > > > ability to determine the parallelism. The table source can choose to
> > set
> > > > the parallelism by itself, or delegate
> > > > it to the framework.
> > > >
> > > > This might sound like personal taste, but there is another bad case
> if
> > we
> > > > introduce the interface. You
> > > > may already know we currently have two major table
> > > > sources, LookupTableSource and ScanTableSource.
> > > > IIUC it won't make much sense if the user provides a
> LookupTableSource
> > > and
> > > > also implements
> > > > SupportsParallelismReport.
> > > >
> > > > An alternative solution would be add the method you want directly
> > > > to ScanTableSource, and also have
> > > > a default implementation returning -1, which means letting framework
> to
> > > > decide the parallelism.
> > > >
> > > > 2) Regarding the interface SupportsStatisticsReport, it seems this
> > > > interface doesn't work for unbounded
> > > > streaming table sources. What kind of implementation do you expect in
> > > such
> > > > a case? And how does this
> > > > interface work with LookupTableSource?
> > > > Another question is what the oldStats parameter is used for?
> > > >
> > > > 3) Internal or Public. I don't think we should mark them as internal.
> > > They
> > > > are currently only used by internal
> > > > connectors doesn't mean this interface should be internal. I can
> > imagine
> > > > there will be lots of Filesystem like
> > > > connectors outside the project which need such capability.
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Thu, Jul 30, 2020 at 1:02 PM Benchao Li <[hidden email]>
> > wrote:
> > > >
> > > > > Hi Jingsong,
> > > > >
> > > > > Regarding SupportsParallelismReport,
> > > > > I think the streaming connectors can also benefit from it.
> > > > > I see some requirements from user ML that they want to control
> > > > > source/sink's parallelism instead
> > > > > to set them to global parallelism.
> > > > > Also, in our compony, we did this too.
> > > > >
> > > > > Jingsong Li <[hidden email]> 于2020年7月30日周四 上午11:16写道:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > ## SupportsParallelismReport
> > > > > >
> > > > > > Now that FLIP-95 [1] is ready, only Hive and Filesystem are still
> > > using
> > > > > the
> > > > > > old interfaces.
> > > > > >
> > > > > > We are considering migrating to the new interface.
> > > > > >
> > > > > > However, one problem is that in the old interface implementation,
> > > > > > connectors infer parallelism by itself instead of a global
> > > parallelism
> > > > > > configuration. Hive & filesystem determines the parallelism size
> > > > > according
> > > > > > to the number of files and the size of the file. In this way,
> large
> > > > > tables
> > > > > > may use thousands of parallelisms, while small tables only have
> 10
> > > > > > parallelisms, which can minimize the consumption of task
> > scheduling.
> > > > > >
> > > > > > This situation is very common in batch computing. For example, in
> > the
> > > > > star
> > > > > > model, a large table needs to be joined with multiple small
> tables.
> > > > > >
> > > > > > So we should give this ability to new table source interfaces.
> The
> > > > > > interface can be:
> > > > > >
> > > > > > /**
> > > > > >  * Enables to give source the ability to report parallelism.
> > > > > >  *
> > > > > >  * <p>After filtering push down and partition push down, the
> source
> > > > > > can have more information,
> > > > > >  * which can help it infer more effective parallelism.
> > > > > >  */
> > > > > > @Internal
> > > > > > public interface SupportsParallelismReport {
> > > > > >
> > > > > >    /**
> > > > > >     * Report parallelism from source or sink. The parallelism of
> an
> > > > > > operator must be at least 1,
> > > > > >     * or -1 (use system default).
> > > > > >     */
> > > > > >    int reportParallelism();
> > > > > > }
> > > > > >
> > > > > >
> > > > > > Rejected Alternatives:
> > > > > > - SupportsSplitReport: What is the relationship between this
> split
> > > and
> > > > > the
> > > > > > split of FLIP-27? Do we have to match them one by one? I think
> they
> > > are
> > > > > two
> > > > > > independent things. In fact, the design of FLIP-27, split and
> > > > parallelism
> > > > > > are not bound one by one.
> > > > > > - SupportsPartitionReport: What is partition? Actually, in
> > table/SQL,
> > > > > > partition is a special concept of table. It should not be mixed
> > with
> > > > > > parallelism.
> > > > > >
> > > > > > ## SupportsStatisticsReport
> > > > > >
> > > > > > As with parallelism, statistics information from source will be
> > more
> > > > > > appropriate and accurate. After filtering push down and partition
> > > push
> > > > > > down, the source can have more information, which can help it
> infer
> > > > more
> > > > > > effective statistics. However, if we only infer from the planner
> > > > itself,
> > > > > it
> > > > > > may lead to a big gap between the statistics information and the
> > real
> > > > > > situation.
> > > > > >
> > > > > > The interface:
> > > > > >
> > > > > > /**
> > > > > >  * Enables to give {@link ScanTableSource} the ability to report
> > > table
> > > > > > statistics.
> > > > > >  *
> > > > > >  * <p>Statistics can be inferred from real data in real time,  it
> > is
> > > > > > more accurate than the
> > > > > >  * statistics in the catalog.
> > > > > >  *
> > > > > >  * <p>After filtering push down and partition push down, the
> source
> > > > > > can have more information,
> > > > > >  * which can help it infer more effective table statistics.
> > > > > >  */
> > > > > > @Internal
> > > > > > public interface SupportsStatisticsReport {
> > > > > >
> > > > > >    /**
> > > > > >     * Reports {@link TableStats} from old table stats.
> > > > > >     */
> > > > > >    TableStats reportTableStatistics(TableStats oldStats);
> > > > > > }
> > > > > >
> > > > > >
> > > > > > When to invoke reported statistics to the planner?
> > > > > > - First of all, this call can be expensive (to view the metadata
> of
> > > the
> > > > > > files), so it can't be called repeatedly.
> > > > > > - We need to call after FilterPushdown, because that's the most
> > > > accurate
> > > > > > information. We also need to call before CBO (Like JoinReorder
> and
> > > > choose
> > > > > > BroadcastJoin or ShuffleJoin), because that's where statistics
> are
> > > > used.
> > > > > >
> > > > > > Rejected Alternatives:
> > > > > > - Using CatalogTableStatistics: CatalogTableStatistics or
> > > TableStats? I
> > > > > > lean to TableStats, because TableStats is the class used by
> > planner,
> > > > > > but CatalogTableStatistics may contains some catalog information
> > > which
> > > > is
> > > > > > not related to planner optimizer.
> > > > > >
> > > > > > ## Internal or Public
> > > > > >
> > > > > > I personally lean to internal, these interfaces are only used
> Hive
> > > and
> > > > > > Filesystem, another way is: SupportsParallelismReport(Internal, I
> > > > haven't
> > > > > > seen this requirement from outside.) and
> > > > SupportsStatisticsReport(Public,
> > > > > > maybe Apache Iceberg Flink connector can use it).
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> > > > > >
> > > > > > Best,
> > > > > > Jingsong Lee
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


--
Best, Jingsong Lee