[DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

Guowei Ma
Hi, guys:
I propose a design to enhance Stream Operator API for Batch’s requirements.
This is also the Flink’s goal that Batch is a special case of Streaming. This
proposal mainly contains two changes to operator api:

1. Allow "StreamOperator" can choose which input to read;
2. Notify "StreamOperator" that an input has ended.


This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo Sun
offlline.
It will be great to hear the feed backs and suggestions from the community.
Please kindly share your comments and suggestions.

Best
GuoWei Ma.

 Enhance Operator API to Support Dynamically Sel...
<https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

Stephan Ewen
Nice design proposal, and +1 to the general idea.

A few thoughts / suggestions:

*binary vs. n-ary*

I would plan ahead for N-ary operators. Not because we necessarily need
n-ary inputs (one can probably build that purely in the API) but because of
future side inputs. The proposal should be able to handle that as well.

*enum vs. integer*

The above might be easier is to realize when going directly with integer
and having ANY, FIRST, SECOND, etc. as pre-defined constants.
Performance wise, it is probably not difference whether to use int or enum.

*generic selectable interface*

From the proposal, I don't understand quite what that interface is for. My
understanding is that the input processor or task that calls the
operators's functions would anyways work on the TwoInputStreamOperator
interface, for efficiency.

*end-input*

I think we should not make storing the end-input the operator's
responsibility
There is a simple way to handle this, which is also consistent with other
aspects of handling finished tasks:

  - If a task is finished, that should be stored in the checkpoint.
 - Upon restoring a finished task, if it has still running successors, we
deploy a "finished input channel", which immediately send the "end of
input" when task is started.
 - the operator will hence set the end of input immediately again upon

*early-out*

Letting nextSelection() return “NONE” or “FINISHED" may be relevant for
early-out cases, but I would remove this from the scope of this proposal.
There are most likely other big changes involved, like communicating this
to the upstream operators.

*distributed stream deadlocks*

We had this issue in the DataSet API. Earlier versions of the DataSet API
made an analysis of the flow detecting dams and whether the pipeline
breaking behavior in the flow would cause deadlocks, and introduce
artificial pipeline breakers in response.

The logic was really complicated and it took a while to become stable. We
had several issues that certain user functions (like mapPartition) could
either be pipelined or have a full dam (not possible to know for the
system), so we had to insert artificial pipeline breakers in all paths.

In the end we simply decided that in the case of a diamond-style flow, we
make the point where the flow first forks as blocking shuffle. That was
super simple, solved all issues, and has the additional nice property that
it great point to materialize data for recovery, because it helps both
paths of the diamond upon failure.

My suggestion:
==> For streaming, no problem so far, nothing to do
==> For batch, would suggest to go with the simple solution described above
first, and improve when we see cases where this impacts performance
significantly

*empty input / selection timeout*

I can see that being relevant in future streaming cases, for example with
side inputs. You want to wait for the side input data, but with a timeout,
so the program can still proceed with non-perfect context data in case that
context data is very late.

Because we do not support side inputs at the moment, we may want to defer
this for now. Let's not over-design for problems that are not well
understood at this point.

*timers*

I don't understand the problem with timers. Timers are bound to the
operator, not the input, so they should still work if an input ends.
There are cases where some state in the operator that is only relevant as
long as an input still has data (like in symmetric joins) and the timers
are relevant to that state.
When the state is dropped, the timers should also be dropped, but that is
the operator's logic on "endInput()". So there is no inherent issue between
input and timers.

Best,
Stephan


On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <[hidden email]> wrote:

> Hi, guys:
> I propose a design to enhance Stream Operator API for Batch’s requirements.
> This is also the Flink’s goal that Batch is a special case of Streaming.
> This
> proposal mainly contains two changes to operator api:
>
> 1. Allow "StreamOperator" can choose which input to read;
> 2. Notify "StreamOperator" that an input has ended.
>
>
> This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo Sun
> offlline.
> It will be great to hear the feed backs and suggestions from the community.
> Please kindly share your comments and suggestions.
>
> Best
> GuoWei Ma.
>
>  Enhance Operator API to Support Dynamically Sel...
> <
> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

Guowei Ma
2019.2.10


Hi,Stephan


Thank you very much for such detailed and constructive comments.


*binary vs. n-ary* and *enum vs. integer*


Considering the N-ary, as you mentioned, using integers may be a better
choice.


*generic selectable interface*


You are right. This interface can be removed.


*end-input*

It is true that the Operator does not need to store the end-input state,
which can be inferred by the system and notify the Operator at the right
time. We can consider using this mechanism when the system can checkpoint
the topology with the Finish Tasks.


*early-out*

It is reasonable for me not to consider this situation at present.


*distributed stream deadlocks*


At present, there is no deadlock for the streaming, but I think it might
be  still necessary to do some validation(Warning or Reject) in JobGraph.
Because once Flink introduces this TwoInputSelectable interface, the user
of the streaming would also construct a diamond-style topology that may be
deadlocked.


*empty input / selection timeout*

It is reasonable for me not to consider this situation at present.


*timers*

When all the inputs are finished, TimeService will wait until all timers
are triggered. So there should be no problem. I and others guys are
confirming the details to see if there are other considerations


Best

GuoWei

Stephan Ewen <[hidden email]> 于2019年2月8日周五 下午7:56写道:

> Nice design proposal, and +1 to the general idea.
>
> A few thoughts / suggestions:
>
> *binary vs. n-ary*
>
> I would plan ahead for N-ary operators. Not because we necessarily need
> n-ary inputs (one can probably build that purely in the API) but because of
> future side inputs. The proposal should be able to handle that as well.
>
> *enum vs. integer*
>
> The above might be easier is to realize when going directly with integer
> and having ANY, FIRST, SECOND, etc. as pre-defined constants.
> Performance wise, it is probably not difference whether to use int or enum.
>
> *generic selectable interface*
>
> From the proposal, I don't understand quite what that interface is for. My
> understanding is that the input processor or task that calls the
> operators's functions would anyways work on the TwoInputStreamOperator
> interface, for efficiency.
>
> *end-input*
>
> I think we should not make storing the end-input the operator's
> responsibility
> There is a simple way to handle this, which is also consistent with other
> aspects of handling finished tasks:
>
>   - If a task is finished, that should be stored in the checkpoint.
>  - Upon restoring a finished task, if it has still running successors, we
> deploy a "finished input channel", which immediately send the "end of
> input" when task is started.
>  - the operator will hence set the end of input immediately again upon
>
> *early-out*
>
> Letting nextSelection() return “NONE” or “FINISHED" may be relevant for
> early-out cases, but I would remove this from the scope of this proposal.
> There are most likely other big changes involved, like communicating this
> to the upstream operators.
>
> *distributed stream deadlocks*
>
> We had this issue in the DataSet API. Earlier versions of the DataSet API
> made an analysis of the flow detecting dams and whether the pipeline
> breaking behavior in the flow would cause deadlocks, and introduce
> artificial pipeline breakers in response.
>
> The logic was really complicated and it took a while to become stable. We
> had several issues that certain user functions (like mapPartition) could
> either be pipelined or have a full dam (not possible to know for the
> system), so we had to insert artificial pipeline breakers in all paths.
>
> In the end we simply decided that in the case of a diamond-style flow, we
> make the point where the flow first forks as blocking shuffle. That was
> super simple, solved all issues, and has the additional nice property that
> it great point to materialize data for recovery, because it helps both
> paths of the diamond upon failure.
>
> My suggestion:
> ==> For streaming, no problem so far, nothing to do
> ==> For batch, would suggest to go with the simple solution described above
> first, and improve when we see cases where this impacts performance
> significantly
>
> *empty input / selection timeout*
>
> I can see that being relevant in future streaming cases, for example with
> side inputs. You want to wait for the side input data, but with a timeout,
> so the program can still proceed with non-perfect context data in case that
> context data is very late.
>
> Because we do not support side inputs at the moment, we may want to defer
> this for now. Let's not over-design for problems that are not well
> understood at this point.
>
> *timers*
>
> I don't understand the problem with timers. Timers are bound to the
> operator, not the input, so they should still work if an input ends.
> There are cases where some state in the operator that is only relevant as
> long as an input still has data (like in symmetric joins) and the timers
> are relevant to that state.
> When the state is dropped, the timers should also be dropped, but that is
> the operator's logic on "endInput()". So there is no inherent issue between
> input and timers.
>
> Best,
> Stephan
>
>
> On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <[hidden email]> wrote:
>
> > Hi, guys:
> > I propose a design to enhance Stream Operator API for Batch’s
> requirements.
> > This is also the Flink’s goal that Batch is a special case of Streaming.
> > This
> > proposal mainly contains two changes to operator api:
> >
> > 1. Allow "StreamOperator" can choose which input to read;
> > 2. Notify "StreamOperator" that an input has ended.
> >
> >
> > This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo
> Sun
> > offlline.
> > It will be great to hear the feed backs and suggestions from the
> community.
> > Please kindly share your comments and suggestions.
> >
> > Best
> > GuoWei Ma.
> >
> >  Enhance Operator API to Support Dynamically Sel...
> > <
> >
> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

Stephan Ewen
To move this forward, would suggest the following:

  - Let's quickly check which other classes need to change. I assume the
TwoInputStreamTask and StreamTwoInputProcessor ?
  - Can those changes be new classes that are used when the new operator is
used? The current TwoInputStreamTask and StreamTwoInputProcessor remain
until they are fully subsumed and are then removed.

  - Do we need and other refactorings before, like some cleanup of the
Operator Config or the Operator Chain?

Best,
Stephan


On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <[hidden email]> wrote:

> 2019.2.10
>
>
> Hi,Stephan
>
>
> Thank you very much for such detailed and constructive comments.
>
>
> *binary vs. n-ary* and *enum vs. integer*
>
>
> Considering the N-ary, as you mentioned, using integers may be a better
> choice.
>
>
> *generic selectable interface*
>
>
> You are right. This interface can be removed.
>
>
> *end-input*
>
> It is true that the Operator does not need to store the end-input state,
> which can be inferred by the system and notify the Operator at the right
> time. We can consider using this mechanism when the system can checkpoint
> the topology with the Finish Tasks.
>
>
> *early-out*
>
> It is reasonable for me not to consider this situation at present.
>
>
> *distributed stream deadlocks*
>
>
> At present, there is no deadlock for the streaming, but I think it might
> be  still necessary to do some validation(Warning or Reject) in JobGraph.
> Because once Flink introduces this TwoInputSelectable interface, the user
> of the streaming would also construct a diamond-style topology that may be
> deadlocked.
>
>
> *empty input / selection timeout*
>
> It is reasonable for me not to consider this situation at present.
>
>
> *timers*
>
> When all the inputs are finished, TimeService will wait until all timers
> are triggered. So there should be no problem. I and others guys are
> confirming the details to see if there are other considerations
>
>
> Best
>
> GuoWei
>
> Stephan Ewen <[hidden email]> 于2019年2月8日周五 下午7:56写道:
>
> > Nice design proposal, and +1 to the general idea.
> >
> > A few thoughts / suggestions:
> >
> > *binary vs. n-ary*
> >
> > I would plan ahead for N-ary operators. Not because we necessarily need
> > n-ary inputs (one can probably build that purely in the API) but because
> of
> > future side inputs. The proposal should be able to handle that as well.
> >
> > *enum vs. integer*
> >
> > The above might be easier is to realize when going directly with integer
> > and having ANY, FIRST, SECOND, etc. as pre-defined constants.
> > Performance wise, it is probably not difference whether to use int or
> enum.
> >
> > *generic selectable interface*
> >
> > From the proposal, I don't understand quite what that interface is for.
> My
> > understanding is that the input processor or task that calls the
> > operators's functions would anyways work on the TwoInputStreamOperator
> > interface, for efficiency.
> >
> > *end-input*
> >
> > I think we should not make storing the end-input the operator's
> > responsibility
> > There is a simple way to handle this, which is also consistent with other
> > aspects of handling finished tasks:
> >
> >   - If a task is finished, that should be stored in the checkpoint.
> >  - Upon restoring a finished task, if it has still running successors, we
> > deploy a "finished input channel", which immediately send the "end of
> > input" when task is started.
> >  - the operator will hence set the end of input immediately again upon
> >
> > *early-out*
> >
> > Letting nextSelection() return “NONE” or “FINISHED" may be relevant for
> > early-out cases, but I would remove this from the scope of this proposal.
> > There are most likely other big changes involved, like communicating this
> > to the upstream operators.
> >
> > *distributed stream deadlocks*
> >
> > We had this issue in the DataSet API. Earlier versions of the DataSet API
> > made an analysis of the flow detecting dams and whether the pipeline
> > breaking behavior in the flow would cause deadlocks, and introduce
> > artificial pipeline breakers in response.
> >
> > The logic was really complicated and it took a while to become stable. We
> > had several issues that certain user functions (like mapPartition) could
> > either be pipelined or have a full dam (not possible to know for the
> > system), so we had to insert artificial pipeline breakers in all paths.
> >
> > In the end we simply decided that in the case of a diamond-style flow, we
> > make the point where the flow first forks as blocking shuffle. That was
> > super simple, solved all issues, and has the additional nice property
> that
> > it great point to materialize data for recovery, because it helps both
> > paths of the diamond upon failure.
> >
> > My suggestion:
> > ==> For streaming, no problem so far, nothing to do
> > ==> For batch, would suggest to go with the simple solution described
> above
> > first, and improve when we see cases where this impacts performance
> > significantly
> >
> > *empty input / selection timeout*
> >
> > I can see that being relevant in future streaming cases, for example with
> > side inputs. You want to wait for the side input data, but with a
> timeout,
> > so the program can still proceed with non-perfect context data in case
> that
> > context data is very late.
> >
> > Because we do not support side inputs at the moment, we may want to defer
> > this for now. Let's not over-design for problems that are not well
> > understood at this point.
> >
> > *timers*
> >
> > I don't understand the problem with timers. Timers are bound to the
> > operator, not the input, so they should still work if an input ends.
> > There are cases where some state in the operator that is only relevant as
> > long as an input still has data (like in symmetric joins) and the timers
> > are relevant to that state.
> > When the state is dropped, the timers should also be dropped, but that is
> > the operator's logic on "endInput()". So there is no inherent issue
> between
> > input and timers.
> >
> > Best,
> > Stephan
> >
> >
> > On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <[hidden email]> wrote:
> >
> > > Hi, guys:
> > > I propose a design to enhance Stream Operator API for Batch’s
> > requirements.
> > > This is also the Flink’s goal that Batch is a special case of
> Streaming.
> > > This
> > > proposal mainly contains two changes to operator api:
> > >
> > > 1. Allow "StreamOperator" can choose which input to read;
> > > 2. Notify "StreamOperator" that an input has ended.
> > >
> > >
> > > This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo
> > Sun
> > > offlline.
> > > It will be great to hear the feed backs and suggestions from the
> > community.
> > > Please kindly share your comments and suggestions.
> > >
> > > Best
> > > GuoWei Ma.
> > >
> > >  Enhance Operator API to Support Dynamically Sel...
> > > <
> > >
> >
> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

Aljoscha Krettek-2
While we’re on operators and tasks, I think it would also make sense in the long run to move the logic that is now in AbstractStreamOperator.setup()/initializeState()/snapshot()/snapshotState()(and the other snapshotState()…)/dispose() outside of the operator itself. This logic is the same for every operator but shouldn’t really be in there. We currently have a very complicated dance between the StreamTask and AbstractStreamOperator for initialising the state backends that doesn’t really seem necessary.

> On 14. Feb 2019, at 11:54, Stephan Ewen <[hidden email]> wrote:
>
> To move this forward, would suggest the following:
>
>  - Let's quickly check which other classes need to change. I assume the
> TwoInputStreamTask and StreamTwoInputProcessor ?
>  - Can those changes be new classes that are used when the new operator is
> used? The current TwoInputStreamTask and StreamTwoInputProcessor remain
> until they are fully subsumed and are then removed.
>
>  - Do we need and other refactorings before, like some cleanup of the
> Operator Config or the Operator Chain?
>
> Best,
> Stephan
>
>
> On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <[hidden email]> wrote:
>
>> 2019.2.10
>>
>>
>> Hi,Stephan
>>
>>
>> Thank you very much for such detailed and constructive comments.
>>
>>
>> *binary vs. n-ary* and *enum vs. integer*
>>
>>
>> Considering the N-ary, as you mentioned, using integers may be a better
>> choice.
>>
>>
>> *generic selectable interface*
>>
>>
>> You are right. This interface can be removed.
>>
>>
>> *end-input*
>>
>> It is true that the Operator does not need to store the end-input state,
>> which can be inferred by the system and notify the Operator at the right
>> time. We can consider using this mechanism when the system can checkpoint
>> the topology with the Finish Tasks.
>>
>>
>> *early-out*
>>
>> It is reasonable for me not to consider this situation at present.
>>
>>
>> *distributed stream deadlocks*
>>
>>
>> At present, there is no deadlock for the streaming, but I think it might
>> be  still necessary to do some validation(Warning or Reject) in JobGraph.
>> Because once Flink introduces this TwoInputSelectable interface, the user
>> of the streaming would also construct a diamond-style topology that may be
>> deadlocked.
>>
>>
>> *empty input / selection timeout*
>>
>> It is reasonable for me not to consider this situation at present.
>>
>>
>> *timers*
>>
>> When all the inputs are finished, TimeService will wait until all timers
>> are triggered. So there should be no problem. I and others guys are
>> confirming the details to see if there are other considerations
>>
>>
>> Best
>>
>> GuoWei
>>
>> Stephan Ewen <[hidden email]> 于2019年2月8日周五 下午7:56写道:
>>
>>> Nice design proposal, and +1 to the general idea.
>>>
>>> A few thoughts / suggestions:
>>>
>>> *binary vs. n-ary*
>>>
>>> I would plan ahead for N-ary operators. Not because we necessarily need
>>> n-ary inputs (one can probably build that purely in the API) but because
>> of
>>> future side inputs. The proposal should be able to handle that as well.
>>>
>>> *enum vs. integer*
>>>
>>> The above might be easier is to realize when going directly with integer
>>> and having ANY, FIRST, SECOND, etc. as pre-defined constants.
>>> Performance wise, it is probably not difference whether to use int or
>> enum.
>>>
>>> *generic selectable interface*
>>>
>>> From the proposal, I don't understand quite what that interface is for.
>> My
>>> understanding is that the input processor or task that calls the
>>> operators's functions would anyways work on the TwoInputStreamOperator
>>> interface, for efficiency.
>>>
>>> *end-input*
>>>
>>> I think we should not make storing the end-input the operator's
>>> responsibility
>>> There is a simple way to handle this, which is also consistent with other
>>> aspects of handling finished tasks:
>>>
>>>  - If a task is finished, that should be stored in the checkpoint.
>>> - Upon restoring a finished task, if it has still running successors, we
>>> deploy a "finished input channel", which immediately send the "end of
>>> input" when task is started.
>>> - the operator will hence set the end of input immediately again upon
>>>
>>> *early-out*
>>>
>>> Letting nextSelection() return “NONE” or “FINISHED" may be relevant for
>>> early-out cases, but I would remove this from the scope of this proposal.
>>> There are most likely other big changes involved, like communicating this
>>> to the upstream operators.
>>>
>>> *distributed stream deadlocks*
>>>
>>> We had this issue in the DataSet API. Earlier versions of the DataSet API
>>> made an analysis of the flow detecting dams and whether the pipeline
>>> breaking behavior in the flow would cause deadlocks, and introduce
>>> artificial pipeline breakers in response.
>>>
>>> The logic was really complicated and it took a while to become stable. We
>>> had several issues that certain user functions (like mapPartition) could
>>> either be pipelined or have a full dam (not possible to know for the
>>> system), so we had to insert artificial pipeline breakers in all paths.
>>>
>>> In the end we simply decided that in the case of a diamond-style flow, we
>>> make the point where the flow first forks as blocking shuffle. That was
>>> super simple, solved all issues, and has the additional nice property
>> that
>>> it great point to materialize data for recovery, because it helps both
>>> paths of the diamond upon failure.
>>>
>>> My suggestion:
>>> ==> For streaming, no problem so far, nothing to do
>>> ==> For batch, would suggest to go with the simple solution described
>> above
>>> first, and improve when we see cases where this impacts performance
>>> significantly
>>>
>>> *empty input / selection timeout*
>>>
>>> I can see that being relevant in future streaming cases, for example with
>>> side inputs. You want to wait for the side input data, but with a
>> timeout,
>>> so the program can still proceed with non-perfect context data in case
>> that
>>> context data is very late.
>>>
>>> Because we do not support side inputs at the moment, we may want to defer
>>> this for now. Let's not over-design for problems that are not well
>>> understood at this point.
>>>
>>> *timers*
>>>
>>> I don't understand the problem with timers. Timers are bound to the
>>> operator, not the input, so they should still work if an input ends.
>>> There are cases where some state in the operator that is only relevant as
>>> long as an input still has data (like in symmetric joins) and the timers
>>> are relevant to that state.
>>> When the state is dropped, the timers should also be dropped, but that is
>>> the operator's logic on "endInput()". So there is no inherent issue
>> between
>>> input and timers.
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <[hidden email]> wrote:
>>>
>>>> Hi, guys:
>>>> I propose a design to enhance Stream Operator API for Batch’s
>>> requirements.
>>>> This is also the Flink’s goal that Batch is a special case of
>> Streaming.
>>>> This
>>>> proposal mainly contains two changes to operator api:
>>>>
>>>> 1. Allow "StreamOperator" can choose which input to read;
>>>> 2. Notify "StreamOperator" that an input has ended.
>>>>
>>>>
>>>> This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo
>>> Sun
>>>> offlline.
>>>> It will be great to hear the feed backs and suggestions from the
>>> community.
>>>> Please kindly share your comments and suggestions.
>>>>
>>>> Best
>>>> GuoWei Ma.
>>>>
>>>> Enhance Operator API to Support Dynamically Sel...
>>>> <
>>>>
>>>
>> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web
>>>>>
>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

Haibo Sun
In reply to this post by Stephan Ewen


Which classes need to be changed or use new classes?


I'm working on a design that Flink runtime support TwoInputSelectable,
and I'll give a initial proposal document next Monday. In the proposal,
the core classes that need to be changed include UnionInputGate and
StremTwoInputProcessor. I think the discussion that follows can be based
on this initial proposal.


Do we need other refactoring?


From the codes we have implemented this functionality, no other refactors
need to be done before it.


Best,
Haibo
At 2019-02-14 18:54:57, "Stephan Ewen" <[hidden email]> wrote:

>To move this forward, would suggest the following:
>
>  - Let's quickly check which other classes need to change. I assume the
>TwoInputStreamTask and StreamTwoInputProcessor ?
>  - Can those changes be new classes that are used when the new operator is
>used? The current TwoInputStreamTask and StreamTwoInputProcessor remain
>until they are fully subsumed and are then removed.
>
>  - Do we need and other refactorings before, like some cleanup of the
>Operator Config or the Operator Chain?
>
>Best,
>Stephan
>
>
>On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <[hidden email]> wrote:
>
>> 2019.2.10
>>
>>
>> Hi,Stephan
>>
>>
>> Thank you very much for such detailed and constructive comments.
>>
>>
>> *binary vs. n-ary* and *enum vs. integer*
>>
>>
>> Considering the N-ary, as you mentioned, using integers may be a better
>> choice.
>>
>>
>> *generic selectable interface*
>>
>>
>> You are right. This interface can be removed.
>>
>>
>> *end-input*
>>
>> It is true that the Operator does not need to store the end-input state,
>> which can be inferred by the system and notify the Operator at the right
>> time. We can consider using this mechanism when the system can checkpoint
>> the topology with the Finish Tasks.
>>
>>
>> *early-out*
>>
>> It is reasonable for me not to consider this situation at present.
>>
>>
>> *distributed stream deadlocks*
>>
>>
>> At present, there is no deadlock for the streaming, but I think it might
>> be  still necessary to do some validation(Warning or Reject) in JobGraph.
>> Because once Flink introduces this TwoInputSelectable interface, the user
>> of the streaming would also construct a diamond-style topology that may be
>> deadlocked.
>>
>>
>> *empty input / selection timeout*
>>
>> It is reasonable for me not to consider this situation at present.
>>
>>
>> *timers*
>>
>> When all the inputs are finished, TimeService will wait until all timers
>> are triggered. So there should be no problem. I and others guys are
>> confirming the details to see if there are other considerations
>>
>>
>> Best
>>
>> GuoWei
>>
>> Stephan Ewen <[hidden email]> 于2019年2月8日周五 下午7:56写道:
>>
>> > Nice design proposal, and +1 to the general idea.
>> >
>> > A few thoughts / suggestions:
>> >
>> > *binary vs. n-ary*
>> >
>> > I would plan ahead for N-ary operators. Not because we necessarily need
>> > n-ary inputs (one can probably build that purely in the API) but because
>> of
>> > future side inputs. The proposal should be able to handle that as well.
>> >
>> > *enum vs. integer*
>> >
>> > The above might be easier is to realize when going directly with integer
>> > and having ANY, FIRST, SECOND, etc. as pre-defined constants.
>> > Performance wise, it is probably not difference whether to use int or
>> enum.
>> >
>> > *generic selectable interface*
>> >
>> > From the proposal, I don't understand quite what that interface is for.
>> My
>> > understanding is that the input processor or task that calls the
>> > operators's functions would anyways work on the TwoInputStreamOperator
>> > interface, for efficiency.
>> >
>> > *end-input*
>> >
>> > I think we should not make storing the end-input the operator's
>> > responsibility
>> > There is a simple way to handle this, which is also consistent with other
>> > aspects of handling finished tasks:
>> >
>> >   - If a task is finished, that should be stored in the checkpoint.
>> >  - Upon restoring a finished task, if it has still running successors, we
>> > deploy a "finished input channel", which immediately send the "end of
>> > input" when task is started.
>> >  - the operator will hence set the end of input immediately again upon
>> >
>> > *early-out*
>> >
>> > Letting nextSelection() return “NONE” or “FINISHED" may be relevant for
>> > early-out cases, but I would remove this from the scope of this proposal.
>> > There are most likely other big changes involved, like communicating this
>> > to the upstream operators.
>> >
>> > *distributed stream deadlocks*
>> >
>> > We had this issue in the DataSet API. Earlier versions of the DataSet API
>> > made an analysis of the flow detecting dams and whether the pipeline
>> > breaking behavior in the flow would cause deadlocks, and introduce
>> > artificial pipeline breakers in response.
>> >
>> > The logic was really complicated and it took a while to become stable. We
>> > had several issues that certain user functions (like mapPartition) could
>> > either be pipelined or have a full dam (not possible to know for the
>> > system), so we had to insert artificial pipeline breakers in all paths.
>> >
>> > In the end we simply decided that in the case of a diamond-style flow, we
>> > make the point where the flow first forks as blocking shuffle. That was
>> > super simple, solved all issues, and has the additional nice property
>> that
>> > it great point to materialize data for recovery, because it helps both
>> > paths of the diamond upon failure.
>> >
>> > My suggestion:
>> > ==> For streaming, no problem so far, nothing to do
>> > ==> For batch, would suggest to go with the simple solution described
>> above
>> > first, and improve when we see cases where this impacts performance
>> > significantly
>> >
>> > *empty input / selection timeout*
>> >
>> > I can see that being relevant in future streaming cases, for example with
>> > side inputs. You want to wait for the side input data, but with a
>> timeout,
>> > so the program can still proceed with non-perfect context data in case
>> that
>> > context data is very late.
>> >
>> > Because we do not support side inputs at the moment, we may want to defer
>> > this for now. Let's not over-design for problems that are not well
>> > understood at this point.
>> >
>> > *timers*
>> >
>> > I don't understand the problem with timers. Timers are bound to the
>> > operator, not the input, so they should still work if an input ends.
>> > There are cases where some state in the operator that is only relevant as
>> > long as an input still has data (like in symmetric joins) and the timers
>> > are relevant to that state.
>> > When the state is dropped, the timers should also be dropped, but that is
>> > the operator's logic on "endInput()". So there is no inherent issue
>> between
>> > input and timers.
>> >
>> > Best,
>> > Stephan
>> >
>> >
>> > On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <[hidden email]> wrote:
>> >
>> > > Hi, guys:
>> > > I propose a design to enhance Stream Operator API for Batch’s
>> > requirements.
>> > > This is also the Flink’s goal that Batch is a special case of
>> Streaming.
>> > > This
>> > > proposal mainly contains two changes to operator api:
>> > >
>> > > 1. Allow "StreamOperator" can choose which input to read;
>> > > 2. Notify "StreamOperator" that an input has ended.
>> > >
>> > >
>> > > This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo
>> > Sun
>> > > offlline.
>> > > It will be great to hear the feed backs and suggestions from the
>> > community.
>> > > Please kindly share your comments and suggestions.
>> > >
>> > > Best
>> > > GuoWei Ma.
>> > >
>> > >  Enhance Operator API to Support Dynamically Sel...
>> > > <
>> > >
>> >
>> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web
>> > > >
>> > >
>> >
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enhance Operator API to Support Dynamically Selective Reading and EndOfInput Event

Guowei Ma
In reply to this post by Aljoscha Krettek-2
I agree with you. In the long run, we should clearly define which parts
should be exposed to users.
At present, AbstractStreamOperator exposes a lot of concrete
implementations, such as the details of the asynchronous Checkpoint
"OperatorSnapshotFutures", and how to release some resource that is needed
by the implementation. These parts might be transparent to the user. These
also lead to rely on Runtime implementation on the client side. In a way,
users might not see the AbstractStreamOperator class, users should only see
StreamOperator/OneInputStreamOperator/TwoInputStreamOperator Interface.

Aljoscha Krettek <[hidden email]> 于2019年2月14日周四 下午11:49写道:

> While we’re on operators and tasks, I think it would also make sense in
> the long run to move the logic that is now in
> AbstractStreamOperator.setup()/initializeState()/snapshot()/snapshotState()(and
> the other snapshotState()…)/dispose() outside of the operator itself. This
> logic is the same for every operator but shouldn’t really be in there. We
> currently have a very complicated dance between the StreamTask and
> AbstractStreamOperator for initialising the state backends that doesn’t
> really seem necessary.
>
> > On 14. Feb 2019, at 11:54, Stephan Ewen <[hidden email]> wrote:
> >
> > To move this forward, would suggest the following:
> >
> >  - Let's quickly check which other classes need to change. I assume the
> > TwoInputStreamTask and StreamTwoInputProcessor ?
> >  - Can those changes be new classes that are used when the new operator
> is
> > used? The current TwoInputStreamTask and StreamTwoInputProcessor remain
> > until they are fully subsumed and are then removed.
> >
> >  - Do we need and other refactorings before, like some cleanup of the
> > Operator Config or the Operator Chain?
> >
> > Best,
> > Stephan
> >
> >
> > On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <[hidden email]> wrote:
> >
> >> 2019.2.10
> >>
> >>
> >> Hi,Stephan
> >>
> >>
> >> Thank you very much for such detailed and constructive comments.
> >>
> >>
> >> *binary vs. n-ary* and *enum vs. integer*
> >>
> >>
> >> Considering the N-ary, as you mentioned, using integers may be a better
> >> choice.
> >>
> >>
> >> *generic selectable interface*
> >>
> >>
> >> You are right. This interface can be removed.
> >>
> >>
> >> *end-input*
> >>
> >> It is true that the Operator does not need to store the end-input state,
> >> which can be inferred by the system and notify the Operator at the right
> >> time. We can consider using this mechanism when the system can
> checkpoint
> >> the topology with the Finish Tasks.
> >>
> >>
> >> *early-out*
> >>
> >> It is reasonable for me not to consider this situation at present.
> >>
> >>
> >> *distributed stream deadlocks*
> >>
> >>
> >> At present, there is no deadlock for the streaming, but I think it might
> >> be  still necessary to do some validation(Warning or Reject) in
> JobGraph.
> >> Because once Flink introduces this TwoInputSelectable interface, the
> user
> >> of the streaming would also construct a diamond-style topology that may
> be
> >> deadlocked.
> >>
> >>
> >> *empty input / selection timeout*
> >>
> >> It is reasonable for me not to consider this situation at present.
> >>
> >>
> >> *timers*
> >>
> >> When all the inputs are finished, TimeService will wait until all timers
> >> are triggered. So there should be no problem. I and others guys are
> >> confirming the details to see if there are other considerations
> >>
> >>
> >> Best
> >>
> >> GuoWei
> >>
> >> Stephan Ewen <[hidden email]> 于2019年2月8日周五 下午7:56写道:
> >>
> >>> Nice design proposal, and +1 to the general idea.
> >>>
> >>> A few thoughts / suggestions:
> >>>
> >>> *binary vs. n-ary*
> >>>
> >>> I would plan ahead for N-ary operators. Not because we necessarily need
> >>> n-ary inputs (one can probably build that purely in the API) but
> because
> >> of
> >>> future side inputs. The proposal should be able to handle that as well.
> >>>
> >>> *enum vs. integer*
> >>>
> >>> The above might be easier is to realize when going directly with
> integer
> >>> and having ANY, FIRST, SECOND, etc. as pre-defined constants.
> >>> Performance wise, it is probably not difference whether to use int or
> >> enum.
> >>>
> >>> *generic selectable interface*
> >>>
> >>> From the proposal, I don't understand quite what that interface is for.
> >> My
> >>> understanding is that the input processor or task that calls the
> >>> operators's functions would anyways work on the TwoInputStreamOperator
> >>> interface, for efficiency.
> >>>
> >>> *end-input*
> >>>
> >>> I think we should not make storing the end-input the operator's
> >>> responsibility
> >>> There is a simple way to handle this, which is also consistent with
> other
> >>> aspects of handling finished tasks:
> >>>
> >>>  - If a task is finished, that should be stored in the checkpoint.
> >>> - Upon restoring a finished task, if it has still running successors,
> we
> >>> deploy a "finished input channel", which immediately send the "end of
> >>> input" when task is started.
> >>> - the operator will hence set the end of input immediately again upon
> >>>
> >>> *early-out*
> >>>
> >>> Letting nextSelection() return “NONE” or “FINISHED" may be relevant for
> >>> early-out cases, but I would remove this from the scope of this
> proposal.
> >>> There are most likely other big changes involved, like communicating
> this
> >>> to the upstream operators.
> >>>
> >>> *distributed stream deadlocks*
> >>>
> >>> We had this issue in the DataSet API. Earlier versions of the DataSet
> API
> >>> made an analysis of the flow detecting dams and whether the pipeline
> >>> breaking behavior in the flow would cause deadlocks, and introduce
> >>> artificial pipeline breakers in response.
> >>>
> >>> The logic was really complicated and it took a while to become stable.
> We
> >>> had several issues that certain user functions (like mapPartition)
> could
> >>> either be pipelined or have a full dam (not possible to know for the
> >>> system), so we had to insert artificial pipeline breakers in all paths.
> >>>
> >>> In the end we simply decided that in the case of a diamond-style flow,
> we
> >>> make the point where the flow first forks as blocking shuffle. That was
> >>> super simple, solved all issues, and has the additional nice property
> >> that
> >>> it great point to materialize data for recovery, because it helps both
> >>> paths of the diamond upon failure.
> >>>
> >>> My suggestion:
> >>> ==> For streaming, no problem so far, nothing to do
> >>> ==> For batch, would suggest to go with the simple solution described
> >> above
> >>> first, and improve when we see cases where this impacts performance
> >>> significantly
> >>>
> >>> *empty input / selection timeout*
> >>>
> >>> I can see that being relevant in future streaming cases, for example
> with
> >>> side inputs. You want to wait for the side input data, but with a
> >> timeout,
> >>> so the program can still proceed with non-perfect context data in case
> >> that
> >>> context data is very late.
> >>>
> >>> Because we do not support side inputs at the moment, we may want to
> defer
> >>> this for now. Let's not over-design for problems that are not well
> >>> understood at this point.
> >>>
> >>> *timers*
> >>>
> >>> I don't understand the problem with timers. Timers are bound to the
> >>> operator, not the input, so they should still work if an input ends.
> >>> There are cases where some state in the operator that is only relevant
> as
> >>> long as an input still has data (like in symmetric joins) and the
> timers
> >>> are relevant to that state.
> >>> When the state is dropped, the timers should also be dropped, but that
> is
> >>> the operator's logic on "endInput()". So there is no inherent issue
> >> between
> >>> input and timers.
> >>>
> >>> Best,
> >>> Stephan
> >>>
> >>>
> >>> On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <[hidden email]> wrote:
> >>>
> >>>> Hi, guys:
> >>>> I propose a design to enhance Stream Operator API for Batch’s
> >>> requirements.
> >>>> This is also the Flink’s goal that Batch is a special case of
> >> Streaming.
> >>>> This
> >>>> proposal mainly contains two changes to operator api:
> >>>>
> >>>> 1. Allow "StreamOperator" can choose which input to read;
> >>>> 2. Notify "StreamOperator" that an input has ended.
> >>>>
> >>>>
> >>>> This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo
> >>> Sun
> >>>> offlline.
> >>>> It will be great to hear the feed backs and suggestions from the
> >>> community.
> >>>> Please kindly share your comments and suggestions.
> >>>>
> >>>> Best
> >>>> GuoWei Ma.
> >>>>
> >>>> Enhance Operator API to Support Dynamically Sel...
> >>>> <
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web
> >>>>>
> >>>>
> >>>
> >>
>
>