[DISCUSS] Unified Core API for Streaming and Batch

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Unified Core API for Streaming and Batch

Haibo Sun
Hi all,
This post proposes unified core API for Streaming and Batch.
Currently DataStream and DataSet adopt separated compilation processes, execution tasks
and basic programming models in the runtime layer, which complicates the system implementation.
We think that batch jobs can be processed in the same way as streaming jobs, thus we can unify
the execution stack of DataSet into that of DataStream.  After the unification the DataSet API will
also be built on top of StreamTransformation, and its basic programming model will be changed
from "UDF on Driver" to "UDF on StreamOperator". Although the DataSet operators will need to
implement the interface StreamOperator instead after the unification, user jobs do not need to change
since DataSet uses the same UDF interfaces as DataStream.

The unification has at least three benefits:
1. The system will be greatly simplified with the same execution stack for both streaming and batch jobs.
2. It is no longer necessary to implement two sets of Driver(s) (operator strategies) for batch, namely chained and non-chained.
3. The unified programming model enables streaming and batch jobs to share the same operator implementation.

The following is the design draft. Any feedback is highly appreciated. .https://docs.google.com/document/d/1G0NUIaaNJvT6CMrNCP6dRXGv88xNhDQqZFrQEuJ0rVU/edit?usp=sharing

Best,
Haibo
Reply | Threaded
Open this post in threaded view
|

回复:[DISCUSS] Unified Core API for Streaming and Batch

Zhijiang(wangzhijiang999)
Hi haibo,

Thanks for bringing this discussion!

 I reviewd the google doc and really like the idea of unifying the stream and batch in all stacks. Currently only network runtime stack is unified for both stream and batch jobs, but the compilation, operator and runtime task stacks are all separate. The stream stack developed frequently and behaved dominantly these years, but the batch stack was touched less. If they are unified into one stack, the batch jobs can also get benefits from all the improvements. I think it is a very big work but worth doing, left some concerns:

1. The current job graph generation for batch covers complicated optimization such as cost-based estimate, plan etc. Would this part also be considered retaining during integrating with stream graph generation?

2. I saw some other special improvements for batch scenarios in the doc, such as input selection while reading. I acknowledge these roles for special batch scenarios, but they seem not the blocker for unification motivation, because current batch jobs can also work without these improvements. So the further improvments can be separated into individual topics after we reaching the unification of stream and batch firstly.

Best,
Zhijiang


------------------------------------------------------------------
发件人:孙海波 <[hidden email]>
发送时间:2018年12月3日(星期一) 10:52
收件人:dev <[hidden email]>
主 题:[DISCUSS] Unified Core API for Streaming and Batch

Hi all,
This post proposes unified core API for Streaming and Batch.
Currently DataStream and DataSet adopt separated compilation processes, execution tasks
and basic programming models in the runtime layer, which complicates the system implementation.
We think that batch jobs can be processed in the same way as streaming jobs, thus we can unify
the execution stack of DataSet into that of DataStream.  After the unification the DataSet API will
also be built on top of StreamTransformation, and its basic programming model will be changed
from "UDF on Driver" to "UDF on StreamOperator". Although the DataSet operators will need to
implement the interface StreamOperator instead after the unification, user jobs do not need to change
since DataSet uses the same UDF interfaces as DataStream.

The unification has at least three benefits:
1. The system will be greatly simplified with the same execution stack for both streaming and batch jobs.
2. It is no longer necessary to implement two sets of Driver(s) (operator strategies) for batch, namely chained and non-chained.
3. The unified programming model enables streaming and batch jobs to share the same operator implementation.

The following is the design draft. Any feedback is highly appreciated. .https://docs.google.com/document/d/1G0NUIaaNJvT6CMrNCP6dRXGv88xNhDQqZFrQEuJ0rVU/edit?usp=sharing

Best,
Haibo

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Unified Core API for Streaming and Batch

jincheng sun
In reply to this post by Haibo Sun
Hi Haibo,

Thank you for this great proposal!

Flink is a unified computing engine. It has been unified at the TableAPI
and SQLAPI levels (not yet complete). It's greate If we can unify the
DataSet API and DataStream API.

I also want to convert to StreamTransformation in the SQL and Table API,
because batch(SQL/TableAPI) and stream(SQL/TableAPI) can use the Calcite
optimizer for query optimization, so we can ignore the optimization of
DataSet(for batch).
But for users who use DataSet purely, how to solve the optimization problem?

Best,
Jincheng

孙海波 <[hidden email]> 于2018年12月3日周一 上午10:52写道:

> Hi all,
> This post proposes unified core API for Streaming and Batch.
> Currently DataStream and DataSet adopt separated compilation processes,
> execution tasks
> and basic programming models in the runtime layer, which complicates the
> system implementation.
> We think that batch jobs can be processed in the same way as streaming
> jobs, thus we can unify
> the execution stack of DataSet into that of DataStream.  After the
> unification the DataSet API will
> also be built on top of StreamTransformation, and its basic programming
> model will be changed
> from "UDF on Driver" to "UDF on StreamOperator". Although the DataSet
> operators will need to
> implement the interface StreamOperator instead after the unification, user
> jobs do not need to change
> since DataSet uses the same UDF interfaces as DataStream.
>
> The unification has at least three benefits:
> 1. The system will be greatly simplified with the same execution stack for
> both streaming and batch jobs.
> 2. It is no longer necessary to implement two sets of Driver(s) (operator
> strategies) for batch, namely chained and non-chained.
> 3. The unified programming model enables streaming and batch jobs to share
> the same operator implementation.
>
> The following is the design draft. Any feedback is highly appreciated. .
> https://docs.google.com/document/d/1G0NUIaaNJvT6CMrNCP6dRXGv88xNhDQqZFrQEuJ0rVU/edit?usp=sharing
>
> Best,
> Haibo
Reply | Threaded
Open this post in threaded view
|

Re:[DISCUSS] Unified Core API for Streaming and Batch

Haibo Sun
In reply to this post by Zhijiang(wangzhijiang999)
Thanks, zhijiang.


For the optimization, such as cost-based estimation, we still want to keep it in the data set layer,
but your suggestion is also a thought that can be considered.


As I know, currently these batch scenarios have been contained in DataSet, such as
the sort-merge join algorithm. So I think that the unification should consider such features
as input selection at reading.


Best,
Haibo


At 2018-12-03 16:38:13, "zhijiang" <[hidden email]> wrote:

>Hi haibo,
>
>Thanks for bringing this discussion!
>
> I reviewd the google doc and really like the idea of unifying the stream and batch in all stacks. Currently only network runtime stack is unified for both stream and batch jobs, but the compilation, operator and runtime task stacks are all separate. The stream stack developed frequently and behaved dominantly these years, but the batch stack was touched less. If they are unified into one stack, the batch jobs can also get benefits from all the improvements. I think it is a very big work but worth doing, left some concerns:
>
>1. The current job graph generation for batch covers complicated optimization such as cost-based estimate, plan etc. Would this part also be considered retaining during integrating with stream graph generation?
>
>2. I saw some other special improvements for batch scenarios in the doc, such as input selection while reading. I acknowledge these roles for special batch scenarios, but they seem not the blocker for unification motivation, because current batch jobs can also work without these improvements. So the further improvments can be separated into individual topics after we reaching the unification of stream and batch firstly.
>
>Best,
>Zhijiang
>
>
>------------------------------------------------------------------
>发件人:孙海波 <[hidden email]>
>发送时间:2018年12月3日(星期一) 10:52
>收件人:dev <[hidden email]>
>主 题:[DISCUSS] Unified Core API for Streaming and Batch
>
>Hi all,
>This post proposes unified core API for Streaming and Batch.
>Currently DataStream and DataSet adopt separated compilation processes, execution tasks
>and basic programming models in the runtime layer, which complicates the system implementation.
>We think that batch jobs can be processed in the same way as streaming jobs, thus we can unify
>the execution stack of DataSet into that of DataStream.  After the unification the DataSet API will
>also be built on top of StreamTransformation, and its basic programming model will be changed
>from "UDF on Driver" to "UDF on StreamOperator". Although the DataSet operators will need to
>implement the interface StreamOperator instead after the unification, user jobs do not need to change
>since DataSet uses the same UDF interfaces as DataStream.
>
>The unification has at least three benefits:
>1. The system will be greatly simplified with the same execution stack for both streaming and batch jobs.
>2. It is no longer necessary to implement two sets of Driver(s) (operator strategies) for batch, namely chained and non-chained.
>3. The unified programming model enables streaming and batch jobs to share the same operator implementation.
>

>The following is the design draft. Any feedback is highly appreciated.
>https://docs.google.com/document/d/1G0NUIaaNJvT6CMrNCP6dRXGv88xNhDQqZFrQEuJ0rVU/edit?usp=sharing
>
>Best,
>Haibo
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Unified Core API for Streaming and Batch

Stephan Ewen
Hi all!

This is a great discussion to start and I agree with the idea behind it. We
should get started designing what the Flink stack should look like in the
future.

This discussion is very big, though, and from past experiences if the scope
is too big, the discussions and up falling apart when everyone goes into
different details.
So my suggestion would be to stage this discussion and take this aspect
after aspect, starting with what we want to expose to users and then going
into the internal details.

*Discussion (1) What should the API stack look like*
  - Relationship of DataStream, DataSet, and Table API
  - Where do we want automatic optimization, and where not
  - Future of DataSet (subsumed in data stream, or remains independent)
  - What happens with iterations
  - What happens with the collection execution mode

*Discussion (2) What should the abstractions look like.*
  - This is based on the outcome of (1)
  - Operator DAG
  - Operator Interface
  - what is sent to the REST API when a job is submitted, etc.
  - modules and dependency structure

*Discussion (3) what is special for Batch*
  - I would like to follow the philosophy that "batch allows us to activate
additional optimizations" made possible by the bounded nature of the inputs.
  - special case scheduling
  - additional runtime algorithms (like hybrid hash joins)
  - no watermarks / late data / etc.
  - Special casing in failover (or possibly not, could be still the same
core mechanism

What do you think?

Best,
Stephan



On Mon, Dec 3, 2018 at 12:17 PM Haibo Sun <[hidden email]> wrote:

> Thanks, zhijiang.
>
>
> For the optimization, such as cost-based estimation, we still want to keep
> it in the data set layer,
> but your suggestion is also a thought that can be considered.
>
>
> As I know, currently these batch scenarios have been contained in DataSet,
> such as
> the sort-merge join algorithm. So I think that the unification should
> consider such features
> as input selection at reading.
>
>
> Best,
> Haibo
>
>
> At 2018-12-03 16:38:13, "zhijiang" <[hidden email]>
> wrote:
> >Hi haibo,
> >
> >Thanks for bringing this discussion!
> >
> > I reviewd the google doc and really like the idea of unifying the stream
> and batch in all stacks. Currently only network runtime stack is unified
> for both stream and batch jobs, but the compilation, operator and runtime
> task stacks are all separate. The stream stack developed frequently and
> behaved dominantly these years, but the batch stack was touched less. If
> they are unified into one stack, the batch jobs can also get benefits from
> all the improvements. I think it is a very big work but worth doing, left
> some concerns:
> >
> >1. The current job graph generation for batch covers complicated
> optimization such as cost-based estimate, plan etc. Would this part also be
> considered retaining during integrating with stream graph generation?
> >
> >2. I saw some other special improvements for batch scenarios in the doc,
> such as input selection while reading. I acknowledge these roles for
> special batch scenarios, but they seem not the blocker for unification
> motivation, because current batch jobs can also work without these
> improvements. So the further improvments can be separated into individual
> topics after we reaching the unification of stream and batch firstly.
> >
> >Best,
> >Zhijiang
> >
> >
> >------------------------------------------------------------------
> >发件人:孙海波 <[hidden email]>
> >发送时间:2018年12月3日(星期一) 10:52
> >收件人:dev <[hidden email]>
> >主 题:[DISCUSS] Unified Core API for Streaming and Batch
> >
> >Hi all,
> >This post proposes unified core API for Streaming and Batch.
> >Currently DataStream and DataSet adopt separated compilation processes,
> execution tasks
> >and basic programming models in the runtime layer, which complicates the
> system implementation.
> >We think that batch jobs can be processed in the same way as streaming
> jobs, thus we can unify
> >the execution stack of DataSet into that of DataStream.  After the
> unification the DataSet API will
> >also be built on top of StreamTransformation, and its basic programming
> model will be changed
> >from "UDF on Driver" to "UDF on StreamOperator". Although the DataSet
> operators will need to
> >implement the interface StreamOperator instead after the unification,
> user jobs do not need to change
> >since DataSet uses the same UDF interfaces as DataStream.
> >
> >The unification has at least three benefits:
> >1. The system will be greatly simplified with the same execution stack
> for both streaming and batch jobs.
> >2. It is no longer necessary to implement two sets of Driver(s) (operator
> strategies) for batch, namely chained and non-chained.
> >3. The unified programming model enables streaming and batch jobs to
> share the same operator implementation.
> >
>
> >The following is the design draft. Any feedback is highly appreciated.
> >
> https://docs.google.com/document/d/1G0NUIaaNJvT6CMrNCP6dRXGv88xNhDQqZFrQEuJ0rVU/edit?usp=sharing
> >
> >Best,
> >Haibo
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Unified Core API for Streaming and Batch

Feng Wang
Hi Stephan:
I totally agree with you, this discussion covers too many topics, so we can cut it into a series of sub-discussions proposed by you,  firstly we can focus on phrase-1: “What Flink API Stack Should be for a Unified Engine”.
Best,
Feng Wang

On Dec 3, 2018, at 19:36, Stephan Ewen <[hidden email]<mailto:[hidden email]>> wrote:

Hi all!

This is a great discussion to start and I agree with the idea behind it. We
should get started designing what the Flink stack should look like in the
future.

This discussion is very big, though, and from past experiences if the scope
is too big, the discussions and up falling apart when everyone goes into
different details.
So my suggestion would be to stage this discussion and take this aspect
after aspect, starting with what we want to expose to users and then going
into the internal details.

*Discussion (1) What should the API stack look like*
 - Relationship of DataStream, DataSet, and Table API
 - Where do we want automatic optimization, and where not
 - Future of DataSet (subsumed in data stream, or remains independent)
 - What happens with iterations
 - What happens with the collection execution mode

*Discussion (2) What should the abstractions look like.*
 - This is based on the outcome of (1)
 - Operator DAG
 - Operator Interface
 - what is sent to the REST API when a job is submitted, etc.
 - modules and dependency structure

*Discussion (3) what is special for Batch*
 - I would like to follow the philosophy that "batch allows us to activate
additional optimizations" made possible by the bounded nature of the inputs.
 - special case scheduling
 - additional runtime algorithms (like hybrid hash joins)
 - no watermarks / late data / etc.
 - Special casing in failover (or possibly not, could be still the same
core mechanism

What do you think?

Best,
Stephan



On Mon, Dec 3, 2018 at 12:17 PM Haibo Sun <[hidden email]<mailto:[hidden email]>> wrote:

Thanks, zhijiang.


For the optimization, such as cost-based estimation, we still want to keep
it in the data set layer,
but your suggestion is also a thought that can be considered.


As I know, currently these batch scenarios have been contained in DataSet,
such as
the sort-merge join algorithm. So I think that the unification should
consider such features
as input selection at reading.


Best,
Haibo


At 2018-12-03 16:38:13, "zhijiang" <[hidden email]<mailto:[hidden email]>>
wrote:
Hi haibo,

Thanks for bringing this discussion!

I reviewd the google doc and really like the idea of unifying the stream
and batch in all stacks. Currently only network runtime stack is unified
for both stream and batch jobs, but the compilation, operator and runtime
task stacks are all separate. The stream stack developed frequently and
behaved dominantly these years, but the batch stack was touched less. If
they are unified into one stack, the batch jobs can also get benefits from
all the improvements. I think it is a very big work but worth doing, left
some concerns:

1. The current job graph generation for batch covers complicated
optimization such as cost-based estimate, plan etc. Would this part also be
considered retaining during integrating with stream graph generation?

2. I saw some other special improvements for batch scenarios in the doc,
such as input selection while reading. I acknowledge these roles for
special batch scenarios, but they seem not the blocker for unification
motivation, because current batch jobs can also work without these
improvements. So the further improvments can be separated into individual
topics after we reaching the unification of stream and batch firstly.

Best,
Zhijiang


------------------------------------------------------------------
发件人:孙海波 <[hidden email]<mailto:[hidden email]>>
发送时间:2018年12月3日(星期一) 10:52
收件人:dev <[hidden email]<mailto:[hidden email]>>
主 题:[DISCUSS] Unified Core API for Streaming and Batch

Hi all,
This post proposes unified core API for Streaming and Batch.
Currently DataStream and DataSet adopt separated compilation processes,
execution tasks
and basic programming models in the runtime layer, which complicates the
system implementation.
We think that batch jobs can be processed in the same way as streaming
jobs, thus we can unify
the execution stack of DataSet into that of DataStream.  After the
unification the DataSet API will
also be built on top of StreamTransformation, and its basic programming
model will be changed
from "UDF on Driver" to "UDF on StreamOperator". Although the DataSet
operators will need to
implement the interface StreamOperator instead after the unification,
user jobs do not need to change
since DataSet uses the same UDF interfaces as DataStream.

The unification has at least three benefits:
1. The system will be greatly simplified with the same execution stack
for both streaming and batch jobs.
2. It is no longer necessary to implement two sets of Driver(s) (operator
strategies) for batch, namely chained and non-chained.
3. The unified programming model enables streaming and batch jobs to
share the same operator implementation.


The following is the design draft. Any feedback is highly appreciated.

https://docs.google.com/document/d/1G0NUIaaNJvT6CMrNCP6dRXGv88xNhDQqZFrQEuJ0rVU/edit?usp=sharing

Best,
Haibo


Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Unified Core API for Streaming and Batch

Kurt Young
Hi all,

Really excited to see this discussion really happens, I also want to share
my two cents here.
Lets first focus on this question: “What Flink API Stack Should be for a
Unified Engine".

There are multiply ways to judge whether an engine is unified or not. From
user's perspective, as long as you provides api for
both stream and batch processing, can be considered unified. But that's
definitely not the way a developer sees. I think developers
cares more about the implementations. How many infrastructures are shared
between different compute modes, network, functions,
or even operators? Sharing more things is not a free lunch, it can make
things more complicated, but the potential benefits is also
huge.

In current Flink's implementation, two things are shared. One is the
network stack, and the other one is job scheduling. If we want to
push the unify effort a little further, next thing we should consider is
tasks and operators (BatchTask and Driver for current batch implementation).
Here are the benefits I can see if we try to unify them:
1. State is open to batch processing, make batch failover more
possibilities.
2. Stream processing can borrow some efficient ideas from batch, such as
memory management, binary data processing.
3. Batch & stream operators can be mixed together, to meet more complicated
computation requirements, such as progressive computation,
    which is not pure stream processing or batch processing
4. Make all developers a joint effort, working on same technique stack no
matter you are mainly focus on stream or batch, even ML.

And once the operator api is unified, we can next consider to have a more
formal DAG api for various processing modes. I think we both agree
the idea which Flink built upon: "stream is the basic, batch is just a
special case of streaming". I think it's also make sense to have the DAG
api mainly
focus to describe a stream, whether it is bounded or not. I found
StreamTransformation is good fit for this requirement. It has no semantics,
just tell
you the physical transformation we did on the stream. Like
OneInputStreamTransfomation, all we should know is this takes one stream as
input, have a
operator to process the elements it received, and the output can be further
transformed by another  OneInputStreamTransfomation, or be one input to a
TwoInputStreamTransfomation. It describes how data flows, but have very
limited information about how data be processed, is it be mapped, or be
flatmapped.

All the user API (DataStream, DataSet, Table) we now have, have semantics,
or even consists of optimizers. Based on these thoughts, I will try to
answer the questions Stephan has raised:

  - Relationship of DataStream, DataSet, and Table API
I think these three APIs can be independent for now. DataSet for pure batch
processing, DataStream for pure stream processing and you want to deal with
state explicitly,
Table API for relational data processing.

  - Where do we want automatic optimization, and where not
DataStream, DataSet, and Table API can all have their own optimizations,
but StreamTransformation does not.

  - Future of DataSet (subsumed in data stream, or remains independent)
I think it's better to remains independent for now, and subsumed in data
stream in the future.

  - What happens with iterations
I think the more important question is how to describe iteration on stream
transformations, what information can be hided, and what information must
be exposed to transformation.

  - What happens with the collection execution mode
I think this mode can be fully replaced by mini cluster.

Best,
Kurt


On Tue, Dec 4, 2018 at 12:14 PM Wang Feng <[hidden email]> wrote:

> Hi Stephan:
> I totally agree with you, this discussion covers too many topics, so we
> can cut it into a series of sub-discussions proposed by you,  firstly we
> can focus on phrase-1: “What Flink API Stack Should be for a Unified
> Engine”.
> Best,
> Feng Wang
>
> On Dec 3, 2018, at 19:36, Stephan Ewen <[hidden email]<mailto:
> [hidden email]>> wrote:
>
> Hi all!
>
> This is a great discussion to start and I agree with the idea behind it. We
> should get started designing what the Flink stack should look like in the
> future.
>
> This discussion is very big, though, and from past experiences if the scope
> is too big, the discussions and up falling apart when everyone goes into
> different details.
> So my suggestion would be to stage this discussion and take this aspect
> after aspect, starting with what we want to expose to users and then going
> into the internal details.
>
> *Discussion (1) What should the API stack look like*
>  - Relationship of DataStream, DataSet, and Table API
>  - Where do we want automatic optimization, and where not
>  - Future of DataSet (subsumed in data stream, or remains independent)
>  - What happens with iterations
>  - What happens with the collection execution mode
>
> *Discussion (2) What should the abstractions look like.*
>  - This is based on the outcome of (1)
>  - Operator DAG
>  - Operator Interface
>  - what is sent to the REST API when a job is submitted, etc.
>  - modules and dependency structure
>
> *Discussion (3) what is special for Batch*
>  - I would like to follow the philosophy that "batch allows us to activate
> additional optimizations" made possible by the bounded nature of the
> inputs.
>  - special case scheduling
>  - additional runtime algorithms (like hybrid hash joins)
>  - no watermarks / late data / etc.
>  - Special casing in failover (or possibly not, could be still the same
> core mechanism
>
> What do you think?
>
> Best,
> Stephan
>
>
>
> On Mon, Dec 3, 2018 at 12:17 PM Haibo Sun <[hidden email]<mailto:
> [hidden email]>> wrote:
>
> Thanks, zhijiang.
>
>
> For the optimization, such as cost-based estimation, we still want to keep
> it in the data set layer,
> but your suggestion is also a thought that can be considered.
>
>
> As I know, currently these batch scenarios have been contained in DataSet,
> such as
> the sort-merge join algorithm. So I think that the unification should
> consider such features
> as input selection at reading.
>
>
> Best,
> Haibo
>
>
> At 2018-12-03 16:38:13, "zhijiang" <[hidden email]
> .INVALID<mailto:[hidden email]>>
> wrote:
> Hi haibo,
>
> Thanks for bringing this discussion!
>
> I reviewd the google doc and really like the idea of unifying the stream
> and batch in all stacks. Currently only network runtime stack is unified
> for both stream and batch jobs, but the compilation, operator and runtime
> task stacks are all separate. The stream stack developed frequently and
> behaved dominantly these years, but the batch stack was touched less. If
> they are unified into one stack, the batch jobs can also get benefits from
> all the improvements. I think it is a very big work but worth doing, left
> some concerns:
>
> 1. The current job graph generation for batch covers complicated
> optimization such as cost-based estimate, plan etc. Would this part also be
> considered retaining during integrating with stream graph generation?
>
> 2. I saw some other special improvements for batch scenarios in the doc,
> such as input selection while reading. I acknowledge these roles for
> special batch scenarios, but they seem not the blocker for unification
> motivation, because current batch jobs can also work without these
> improvements. So the further improvments can be separated into individual
> topics after we reaching the unification of stream and batch firstly.
>
> Best,
> Zhijiang
>
>
> ------------------------------------------------------------------
> 发件人:孙海波 <[hidden email]<mailto:[hidden email]>>
> 发送时间:2018年12月3日(星期一) 10:52
> 收件人:dev <[hidden email]<mailto:[hidden email]>>
> 主 题:[DISCUSS] Unified Core API for Streaming and Batch
>
> Hi all,
> This post proposes unified core API for Streaming and Batch.
> Currently DataStream and DataSet adopt separated compilation processes,
> execution tasks
> and basic programming models in the runtime layer, which complicates the
> system implementation.
> We think that batch jobs can be processed in the same way as streaming
> jobs, thus we can unify
> the execution stack of DataSet into that of DataStream.  After the
> unification the DataSet API will
> also be built on top of StreamTransformation, and its basic programming
> model will be changed
> from "UDF on Driver" to "UDF on StreamOperator". Although the DataSet
> operators will need to
> implement the interface StreamOperator instead after the unification,
> user jobs do not need to change
> since DataSet uses the same UDF interfaces as DataStream.
>
> The unification has at least three benefits:
> 1. The system will be greatly simplified with the same execution stack
> for both streaming and batch jobs.
> 2. It is no longer necessary to implement two sets of Driver(s) (operator
> strategies) for batch, namely chained and non-chained.
> 3. The unified programming model enables streaming and batch jobs to
> share the same operator implementation.
>
>
> The following is the design draft. Any feedback is highly appreciated.
>
>
> https://docs.google.com/document/d/1G0NUIaaNJvT6CMrNCP6dRXGv88xNhDQqZFrQEuJ0rVU/edit?usp=sharing
>
> Best,
> Haibo
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Unified Core API for Streaming and Batch

Guowei Ma
Hi, all
Thanks to Haibo for initiating this discussion in the community.

  - Relationship of DataStream, DataSet, and Table API
Table/DataStream/Dataset does have different aspects. For example,
DataStream can access State and Table cannot. DataStream can be easily
extended by users because they do not need to understand complex
optimization logic. On the other hand, as a developer, I think they need to
cooperate. When I develop a job, I want to use Table, DataStream, and
Dataset at the same time. Now DataStream and DataSet can't be converted to
each other.

  - Future of DataSet (subsumed in data stream, or remains independent)
IMO, if the DataSet can be converted to a DataStream, there may be no
difference if the DataSet is independent or included in the DataStream.

  - What happens with iterations
It seems a very interesting question of what iterations should look like.
There may be two options here, one based on compilation and one based on
interaction. Both options are beneficial, and based on compilation,
FailOver may be more controllable and scheduling more efficient. The
interaction-based approach is more flexible, but fault tolerance can be a
bit more difficult. Choosing these two methods may have a certain impact on
the subsequent Operator API design. At this point, I want to hear what the
community thinks.




Kurt Young <[hidden email]> 于2018年12月5日周三 上午11:09写道:

> Hi all,
>
> Really excited to see this discussion really happens, I also want to share
> my two cents here.
> Lets first focus on this question: “What Flink API Stack Should be for a
> Unified Engine".
>
> There are multiply ways to judge whether an engine is unified or not. From
> user's perspective, as long as you provides api for
> both stream and batch processing, can be considered unified. But that's
> definitely not the way a developer sees. I think developers
> cares more about the implementations. How many infrastructures are shared
> between different compute modes, network, functions,
> or even operators? Sharing more things is not a free lunch, it can make
> things more complicated, but the potential benefits is also
> huge.
>
> In current Flink's implementation, two things are shared. One is the
> network stack, and the other one is job scheduling. If we want to
> push the unify effort a little further, next thing we should consider is
> tasks and operators (BatchTask and Driver for current batch
> implementation).
> Here are the benefits I can see if we try to unify them:
> 1. State is open to batch processing, make batch failover more
> possibilities.
> 2. Stream processing can borrow some efficient ideas from batch, such as
> memory management, binary data processing.
> 3. Batch & stream operators can be mixed together, to meet more complicated
> computation requirements, such as progressive computation,
>     which is not pure stream processing or batch processing
> 4. Make all developers a joint effort, working on same technique stack no
> matter you are mainly focus on stream or batch, even ML.
>
> And once the operator api is unified, we can next consider to have a more
> formal DAG api for various processing modes. I think we both agree
> the idea which Flink built upon: "stream is the basic, batch is just a
> special case of streaming". I think it's also make sense to have the DAG
> api mainly
> focus to describe a stream, whether it is bounded or not. I found
> StreamTransformation is good fit for this requirement. It has no semantics,
> just tell
> you the physical transformation we did on the stream. Like
> OneInputStreamTransfomation, all we should know is this takes one stream as
> input, have a
> operator to process the elements it received, and the output can be further
> transformed by another  OneInputStreamTransfomation, or be one input to a
> TwoInputStreamTransfomation. It describes how data flows, but have very
> limited information about how data be processed, is it be mapped, or be
> flatmapped.
>
> All the user API (DataStream, DataSet, Table) we now have, have semantics,
> or even consists of optimizers. Based on these thoughts, I will try to
> answer the questions Stephan has raised:
>
>   - Relationship of DataStream, DataSet, and Table API
> I think these three APIs can be independent for now. DataSet for pure batch
> processing, DataStream for pure stream processing and you want to deal with
> state explicitly,
> Table API for relational data processing.
>
>   - Where do we want automatic optimization, and where not
> DataStream, DataSet, and Table API can all have their own optimizations,
> but StreamTransformation does not.
>
>   - Future of DataSet (subsumed in data stream, or remains independent)
> I think it's better to remains independent for now, and subsumed in data
> stream in the future.
>
>   - What happens with iterations
> I think the more important question is how to describe iteration on stream
> transformations, what information can be hided, and what information must
> be exposed to transformation.
>
>   - What happens with the collection execution mode
> I think this mode can be fully replaced by mini cluster.
>
> Best,
> Kurt
>
>
> On Tue, Dec 4, 2018 at 12:14 PM Wang Feng <[hidden email]> wrote:
>
> > Hi Stephan:
> > I totally agree with you, this discussion covers too many topics, so we
> > can cut it into a series of sub-discussions proposed by you,  firstly we
> > can focus on phrase-1: “What Flink API Stack Should be for a Unified
> > Engine”.
> > Best,
> > Feng Wang
> >
> > On Dec 3, 2018, at 19:36, Stephan Ewen <[hidden email]<mailto:
> > [hidden email]>> wrote:
> >
> > Hi all!
> >
> > This is a great discussion to start and I agree with the idea behind it.
> We
> > should get started designing what the Flink stack should look like in the
> > future.
> >
> > This discussion is very big, though, and from past experiences if the
> scope
> > is too big, the discussions and up falling apart when everyone goes into
> > different details.
> > So my suggestion would be to stage this discussion and take this aspect
> > after aspect, starting with what we want to expose to users and then
> going
> > into the internal details.
> >
> > *Discussion (1) What should the API stack look like*
> >  - Relationship of DataStream, DataSet, and Table API
> >  - Where do we want automatic optimization, and where not
> >  - Future of DataSet (subsumed in data stream, or remains independent)
> >  - What happens with iterations
> >  - What happens with the collection execution mode
> >
> > *Discussion (2) What should the abstractions look like.*
> >  - This is based on the outcome of (1)
> >  - Operator DAG
> >  - Operator Interface
> >  - what is sent to the REST API when a job is submitted, etc.
> >  - modules and dependency structure
> >
> > *Discussion (3) what is special for Batch*
> >  - I would like to follow the philosophy that "batch allows us to
> activate
> > additional optimizations" made possible by the bounded nature of the
> > inputs.
> >  - special case scheduling
> >  - additional runtime algorithms (like hybrid hash joins)
> >  - no watermarks / late data / etc.
> >  - Special casing in failover (or possibly not, could be still the same
> > core mechanism
> >
> > What do you think?
> >
> > Best,
> > Stephan
> >
> >
> >
> > On Mon, Dec 3, 2018 at 12:17 PM Haibo Sun <[hidden email]<mailto:
> > [hidden email]>> wrote:
> >
> > Thanks, zhijiang.
> >
> >
> > For the optimization, such as cost-based estimation, we still want to
> keep
> > it in the data set layer,
> > but your suggestion is also a thought that can be considered.
> >
> >
> > As I know, currently these batch scenarios have been contained in
> DataSet,
> > such as
> > the sort-merge join algorithm. So I think that the unification should
> > consider such features
> > as input selection at reading.
> >
> >
> > Best,
> > Haibo
> >
> >
> > At 2018-12-03 16:38:13, "zhijiang" <[hidden email]
> > .INVALID<mailto:[hidden email]>>
> > wrote:
> > Hi haibo,
> >
> > Thanks for bringing this discussion!
> >
> > I reviewd the google doc and really like the idea of unifying the stream
> > and batch in all stacks. Currently only network runtime stack is unified
> > for both stream and batch jobs, but the compilation, operator and runtime
> > task stacks are all separate. The stream stack developed frequently and
> > behaved dominantly these years, but the batch stack was touched less. If
> > they are unified into one stack, the batch jobs can also get benefits
> from
> > all the improvements. I think it is a very big work but worth doing, left
> > some concerns:
> >
> > 1. The current job graph generation for batch covers complicated
> > optimization such as cost-based estimate, plan etc. Would this part also
> be
> > considered retaining during integrating with stream graph generation?
> >
> > 2. I saw some other special improvements for batch scenarios in the doc,
> > such as input selection while reading. I acknowledge these roles for
> > special batch scenarios, but they seem not the blocker for unification
> > motivation, because current batch jobs can also work without these
> > improvements. So the further improvments can be separated into individual
> > topics after we reaching the unification of stream and batch firstly.
> >
> > Best,
> > Zhijiang
> >
> >
> > ------------------------------------------------------------------
> > 发件人:孙海波 <[hidden email]<mailto:[hidden email]>>
> > 发送时间:2018年12月3日(星期一) 10:52
> > 收件人:dev <[hidden email]<mailto:[hidden email]>>
> > 主 题:[DISCUSS] Unified Core API for Streaming and Batch
> >
> > Hi all,
> > This post proposes unified core API for Streaming and Batch.
> > Currently DataStream and DataSet adopt separated compilation processes,
> > execution tasks
> > and basic programming models in the runtime layer, which complicates the
> > system implementation.
> > We think that batch jobs can be processed in the same way as streaming
> > jobs, thus we can unify
> > the execution stack of DataSet into that of DataStream.  After the
> > unification the DataSet API will
> > also be built on top of StreamTransformation, and its basic programming
> > model will be changed
> > from "UDF on Driver" to "UDF on StreamOperator". Although the DataSet
> > operators will need to
> > implement the interface StreamOperator instead after the unification,
> > user jobs do not need to change
> > since DataSet uses the same UDF interfaces as DataStream.
> >
> > The unification has at least three benefits:
> > 1. The system will be greatly simplified with the same execution stack
> > for both streaming and batch jobs.
> > 2. It is no longer necessary to implement two sets of Driver(s) (operator
> > strategies) for batch, namely chained and non-chained.
> > 3. The unified programming model enables streaming and batch jobs to
> > share the same operator implementation.
> >
> >
> > The following is the design draft. Any feedback is highly appreciated.
> >
> >
> >
> https://docs.google.com/document/d/1G0NUIaaNJvT6CMrNCP6dRXGv88xNhDQqZFrQEuJ0rVU/edit?usp=sharing
> >
> > Best,
> > Haibo
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Unified Core API for Streaming and Batch

Haibo Sun
In reply to this post by Kurt Young
Hi all,

Thank Kurt, you see more benefits of the unification than I do.

I quite agree Kurt's views. DataStream, DataSet and Table are remained
independent for now, and subsumed DataSet in data stream in the future. The
collection execution mode is replaced by mini cluster. The high-level
semantic APIs  have their own optimizations, but StreamTransformation does
not.

About iterations, I have not more ideas at the moment.


Best,
Haibo



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Unified Core API for Streaming and Batch

Shuai Xu
Hi all
Glad to see the discussion, we are now designing to enhance the scheduling
of batch job, a unified api will help a lot.

Haibo Sun <[hidden email]> 于2018年12月5日周三 下午4:45写道:

> Hi all,
>
> Thank Kurt, you see more benefits of the unification than I do.
>
> I quite agree Kurt's views. DataStream, DataSet and Table are remained
> independent for now, and subsumed DataSet in data stream in the future. The
> collection execution mode is replaced by mini cluster. The high-level
> semantic APIs  have their own optimizations, but StreamTransformation does
> not.
>
> About iterations, I have not more ideas at the moment.
>
>
> Best,
> Haibo
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Unified Core API for Streaming and Batch

Aljoscha Krettek-2
Hi All,

this is a great discussion! (I have some thoughts on most of the topics but I'll wait for the separate discussion threads)

@Haibo Will you start a separate threads? I think the separate discussion topics would be (based on Stephans mail but further split up):

1. What should the API stack look like?
2. What should the interface for a single operator look like, i.e. what will StreamOperator look like?
3. What does a job look like, i.e. the graph of operations. Maybe a proper serialized format for DAGs.
4. Modules and dependency structure. This is currently a bit messed up for flink-streaming, which depends on flink-runtime
5. What's special for batch.

There's some interdependencies, i.e. 2 depends on 5. and maybe 1.

Best,
Aljoscha

> On 7. Dec 2018, at 10:00, Shuai Xu <[hidden email]> wrote:
>
> Hi all
> Glad to see the discussion, we are now designing to enhance the scheduling
> of batch job, a unified api will help a lot.
>
> Haibo Sun <[hidden email]> 于2018年12月5日周三 下午4:45写道:
>
>> Hi all,
>>
>> Thank Kurt, you see more benefits of the unification than I do.
>>
>> I quite agree Kurt's views. DataStream, DataSet and Table are remained
>> independent for now, and subsumed DataSet in data stream in the future. The
>> collection execution mode is replaced by mini cluster. The high-level
>> semantic APIs  have their own optimizations, but StreamTransformation does
>> not.
>>
>> About iterations, I have not more ideas at the moment.
>>
>>
>> Best,
>> Haibo
>>
>>
>>
>> --
>> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Unified Core API for Streaming and Batch

Haibo Sun
Hi All,

Thank Aljoscha for further spitting up topics.

I will start separate threads on each topic which you propose.

Best,
Haibo



Aljoscha Krettek-2 wrote

> Hi All,
>
> this is a great discussion! (I have some thoughts on most of the topics
> but I'll wait for the separate discussion threads)
>
> @Haibo Will you start a separate threads? I think the separate discussion
> topics would be (based on Stephans mail but further split up):
>
> 1. What should the API stack look like?
> 2. What should the interface for a single operator look like, i.e. what
> will StreamOperator look like?
> 3. What does a job look like, i.e. the graph of operations. Maybe a proper
> serialized format for DAGs.
> 4. Modules and dependency structure. This is currently a bit messed up for
> flink-streaming, which depends on flink-runtime
> 5. What's special for batch.
>
> There's some interdependencies, i.e. 2 depends on 5. and maybe 1.
>
> Best,
> Aljoscha
>
>> On 7. Dec 2018, at 10:00, Shuai Xu &lt;

> chiggics@

> &gt; wrote:
>>
>> Hi all
>> Glad to see the discussion, we are now designing to enhance the
>> scheduling
>> of batch job, a unified api will help a lot.
>>
>> Haibo Sun &lt;

> sunhaibotb@

> &gt; 于2018年12月5日周三 下午4:45写道:
>>
>>> Hi all,
>>>
>>> Thank Kurt, you see more benefits of the unification than I do.
>>>
>>> I quite agree Kurt's views. DataStream, DataSet and Table are remained
>>> independent for now, and subsumed DataSet in data stream in the future.
>>> The
>>> collection execution mode is replaced by mini cluster. The high-level
>>> semantic APIs  have their own optimizations, but StreamTransformation
>>> does
>>> not.
>>>
>>> About iterations, I have not more ideas at the moment.
>>>
>>>
>>> Best,
>>> Haibo
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>>>





--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/