[DISCUSS] Improvements to flink's cost based optimizer

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Improvements to flink's cost based optimizer

Kurt Young
Hi,

Currently flink already uses cost-based optimizer,  but due to the reason
we didn’t have accurate statistics and the simple cost model, we actually
don't gain much from this framework. I proposed some improvements in the
following document and some rough implementation plan:
https://docs.google.com/document/d/1X7pEWHmuVywbJ8xFtiUY8Cpyw5A1u-4c4tKeODp-W-0/

Hope to hear some feedbacks from you.

best,
Kurt
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improvements to flink's cost based optimizer

Fabian Hueske-2
Hi Kurt,

thanks for starting this discussion!
Although, we use Calcite's cost based optimizer we do not use its full
potential. As you correctly identified, this is mainly due to the lack of
reliable statistics.
Moreover, we use Calcite only for logical optimization, i.e., the optimizer
basically rewrites the query and pushed down filters and projections (no
join reodering yet).
For batch queries, the logically optimized plan is translated into a
DataSet program and the DataSet optimizer chooses the physical execution
plan (shipping strategies, hash vs. merge join, etc.).
It would be great if we could improve this situation for batch tables
(stats on streaming tables might change while a query is executed) by doing
the complete optimization (logical & physical) in Calcite.
However, this will be a long way.

I agree with your first steps to designing a catalog / stats store
component that can store and provide table and column statistics.
Once we have stats about tables, we can start to improve the optimization
step-by-step.
The first thing should be to improve the logical optimization and enable
join reordering.
Once we have that, we can start to chose execution plans for operators by
using the optimizer hints of the DataSet optimizer. This will also involve
tracking the physical properties of intermediate results (sorting,
partitioning, etc.) in Calcite.

I would also recommend to keep the cost model as simple as possible.
A detailed cost model is hard to reason about and does not really help if
its parameters are imprecise.
There are just too many numbers to get wrong like input cardinalities,
selectivities, or cost ratio of disk to net IO.

A few open questions remain:
- How do we handle cases where there is not sufficient statistics for all
tables? For example if we have a query on a Table which was derived from at
DataSet (no stats) which is joined with some external tables with stats.
- Should we control the parallelism of operators based on cardinality
information?


Best, Fabian

2017-01-10 15:22 GMT+01:00 Kurt Young <[hidden email]>:

> Hi,
>
> Currently flink already uses cost-based optimizer,  but due to the reason
> we didn’t have accurate statistics and the simple cost model, we actually
> don't gain much from this framework. I proposed some improvements in the
> following document and some rough implementation plan:
> https://docs.google.com/document/d/1X7pEWHmuVywbJ8xFtiUY8Cpyw5A1u
> -4c4tKeODp-W-0/
>
> Hope to hear some feedbacks from you.
>
> best,
> Kurt
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improvements to flink's cost based optimizer

Kurt Young
Hi Fabian,

Thanks for your detailed response and sorry for the late response. Your
opinions all make sense to me, and here is some thoughts to your open
questions:

- Regarding to table without sufficient statistics, especially these kind
of "dynamic" table which derived from some arbitrary DataSet whose
statistics cannot be analyzed beforehand, i think in first version we can
just provide some fake and fixed statistics to let the process work.
Another approach is we can save the DataSet as some intermediate result
table and do the statistics analyze before further operations. In the
future, a more advanced and ideal way is we keep collecting statistics when
we running the job and we can have a way to dynamic modify the plan during
job executions.

- Regrading to parallelism control, i think it's a good use case of
statistics. Once we have a good cost estimation and how user expects the
performance of the job, we can definitely do some auto tuning for them.

I have opened a jira to track the status and detailed implementation steps
for this issue: https://issues.apache.org/jira/browse/FLINK-5565. Whoever
interests with this topic can continue the discussion there, either in
parent jira or sub-tasks.

Best,
Kurt

On Wed, Jan 11, 2017 at 5:56 AM, Fabian Hueske <[hidden email]> wrote:

> Hi Kurt,
>
> thanks for starting this discussion!
> Although, we use Calcite's cost based optimizer we do not use its full
> potential. As you correctly identified, this is mainly due to the lack of
> reliable statistics.
> Moreover, we use Calcite only for logical optimization, i.e., the optimizer
> basically rewrites the query and pushed down filters and projections (no
> join reodering yet).
> For batch queries, the logically optimized plan is translated into a
> DataSet program and the DataSet optimizer chooses the physical execution
> plan (shipping strategies, hash vs. merge join, etc.).
> It would be great if we could improve this situation for batch tables
> (stats on streaming tables might change while a query is executed) by doing
> the complete optimization (logical & physical) in Calcite.
> However, this will be a long way.
>
> I agree with your first steps to designing a catalog / stats store
> component that can store and provide table and column statistics.
> Once we have stats about tables, we can start to improve the optimization
> step-by-step.
> The first thing should be to improve the logical optimization and enable
> join reordering.
> Once we have that, we can start to chose execution plans for operators by
> using the optimizer hints of the DataSet optimizer. This will also involve
> tracking the physical properties of intermediate results (sorting,
> partitioning, etc.) in Calcite.
>
> I would also recommend to keep the cost model as simple as possible.
> A detailed cost model is hard to reason about and does not really help if
> its parameters are imprecise.
> There are just too many numbers to get wrong like input cardinalities,
> selectivities, or cost ratio of disk to net IO.
>
> A few open questions remain:
> - How do we handle cases where there is not sufficient statistics for all
> tables? For example if we have a query on a Table which was derived from at
> DataSet (no stats) which is joined with some external tables with stats.
> - Should we control the parallelism of operators based on cardinality
> information?
>
>
> Best, Fabian
>
> 2017-01-10 15:22 GMT+01:00 Kurt Young <[hidden email]>:
>
> > Hi,
> >
> > Currently flink already uses cost-based optimizer,  but due to the reason
> > we didn’t have accurate statistics and the simple cost model, we actually
> > don't gain much from this framework. I proposed some improvements in the
> > following document and some rough implementation plan:
> > https://docs.google.com/document/d/1X7pEWHmuVywbJ8xFtiUY8Cpyw5A1u
> > -4c4tKeODp-W-0/
> >
> > Hope to hear some feedbacks from you.
> >
> > best,
> > Kurt
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improvements to flink's cost based optimizer

Fabian Hueske-2
Hi Kurt,

thanks for breaking down the overall into smaller tasks and creating the
corresponding JIRA issues.

Using default estimates for unknown tables can be quite risky, especially
for statistics like cardinality.
In this cases collecting basic stats while writing the input (i.e., a
arbitrary DataSet) to some storage and reading the data back might be a
viable default. A frequently requested feature for the DataSet API is to
cache a DataSet in memory (with spilling to local disk). This might help
here as well.
We can also offer a hook to inject base statistics such as cardinality by
the user.

Regarding the cost model and parallelism. Right now, we mainly optimize to
reduce data volume (network + disk IO). Optimizing for execution time
(which is required when choosing the parallelism) is harder because you
need to combine network and disk IO, CPU, and parallelism into a cost
formula. How much these factors contribute to execution time is very
specific to the hardware / cluster you are running on. If we want to go
into this direction, we might need to have a method to benchmark the system
and calibrate the cost model.

Best,
Fabian


2017-01-19 4:47 GMT+01:00 Kurt Young <[hidden email]>:

> Hi Fabian,
>
> Thanks for your detailed response and sorry for the late response. Your
> opinions all make sense to me, and here is some thoughts to your open
> questions:
>
> - Regarding to table without sufficient statistics, especially these kind
> of "dynamic" table which derived from some arbitrary DataSet whose
> statistics cannot be analyzed beforehand, i think in first version we can
> just provide some fake and fixed statistics to let the process work.
> Another approach is we can save the DataSet as some intermediate result
> table and do the statistics analyze before further operations. In the
> future, a more advanced and ideal way is we keep collecting statistics when
> we running the job and we can have a way to dynamic modify the plan during
> job executions.
>
> - Regrading to parallelism control, i think it's a good use case of
> statistics. Once we have a good cost estimation and how user expects the
> performance of the job, we can definitely do some auto tuning for them.
>
> I have opened a jira to track the status and detailed implementation steps
> for this issue: https://issues.apache.org/jira/browse/FLINK-5565. Whoever
> interests with this topic can continue the discussion there, either in
> parent jira or sub-tasks.
>
> Best,
> Kurt
>
> On Wed, Jan 11, 2017 at 5:56 AM, Fabian Hueske <[hidden email]> wrote:
>
> > Hi Kurt,
> >
> > thanks for starting this discussion!
> > Although, we use Calcite's cost based optimizer we do not use its full
> > potential. As you correctly identified, this is mainly due to the lack of
> > reliable statistics.
> > Moreover, we use Calcite only for logical optimization, i.e., the
> optimizer
> > basically rewrites the query and pushed down filters and projections (no
> > join reodering yet).
> > For batch queries, the logically optimized plan is translated into a
> > DataSet program and the DataSet optimizer chooses the physical execution
> > plan (shipping strategies, hash vs. merge join, etc.).
> > It would be great if we could improve this situation for batch tables
> > (stats on streaming tables might change while a query is executed) by
> doing
> > the complete optimization (logical & physical) in Calcite.
> > However, this will be a long way.
> >
> > I agree with your first steps to designing a catalog / stats store
> > component that can store and provide table and column statistics.
> > Once we have stats about tables, we can start to improve the optimization
> > step-by-step.
> > The first thing should be to improve the logical optimization and enable
> > join reordering.
> > Once we have that, we can start to chose execution plans for operators by
> > using the optimizer hints of the DataSet optimizer. This will also
> involve
> > tracking the physical properties of intermediate results (sorting,
> > partitioning, etc.) in Calcite.
> >
> > I would also recommend to keep the cost model as simple as possible.
> > A detailed cost model is hard to reason about and does not really help if
> > its parameters are imprecise.
> > There are just too many numbers to get wrong like input cardinalities,
> > selectivities, or cost ratio of disk to net IO.
> >
> > A few open questions remain:
> > - How do we handle cases where there is not sufficient statistics for all
> > tables? For example if we have a query on a Table which was derived from
> at
> > DataSet (no stats) which is joined with some external tables with stats.
> > - Should we control the parallelism of operators based on cardinality
> > information?
> >
> >
> > Best, Fabian
> >
> > 2017-01-10 15:22 GMT+01:00 Kurt Young <[hidden email]>:
> >
> > > Hi,
> > >
> > > Currently flink already uses cost-based optimizer,  but due to the
> reason
> > > we didn’t have accurate statistics and the simple cost model, we
> actually
> > > don't gain much from this framework. I proposed some improvements in
> the
> > > following document and some rough implementation plan:
> > > https://docs.google.com/document/d/1X7pEWHmuVywbJ8xFtiUY8Cpyw5A1u
> > > -4c4tKeODp-W-0/
> > >
> > > Hope to hear some feedbacks from you.
> > >
> > > best,
> > > Kurt
> > >
> >
>