Hi, guys:
I propose a design to enhance Stream Operator API for Batch’s requirements. This is also the Flink’s goal that Batch is a special case of Streaming. This proposal mainly contains two changes to operator api: 1. Allow "StreamOperator" can choose which input to read; 2. Notify "StreamOperator" that an input has ended. This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo Sun offlline. It will be great to hear the feed backs and suggestions from the community. Please kindly share your comments and suggestions. Best GuoWei Ma. Enhance Operator API to Support Dynamically Sel... <https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web> |
Nice design proposal, and +1 to the general idea.
A few thoughts / suggestions: *binary vs. n-ary* I would plan ahead for N-ary operators. Not because we necessarily need n-ary inputs (one can probably build that purely in the API) but because of future side inputs. The proposal should be able to handle that as well. *enum vs. integer* The above might be easier is to realize when going directly with integer and having ANY, FIRST, SECOND, etc. as pre-defined constants. Performance wise, it is probably not difference whether to use int or enum. *generic selectable interface* From the proposal, I don't understand quite what that interface is for. My understanding is that the input processor or task that calls the operators's functions would anyways work on the TwoInputStreamOperator interface, for efficiency. *end-input* I think we should not make storing the end-input the operator's responsibility There is a simple way to handle this, which is also consistent with other aspects of handling finished tasks: - If a task is finished, that should be stored in the checkpoint. - Upon restoring a finished task, if it has still running successors, we deploy a "finished input channel", which immediately send the "end of input" when task is started. - the operator will hence set the end of input immediately again upon *early-out* Letting nextSelection() return “NONE” or “FINISHED" may be relevant for early-out cases, but I would remove this from the scope of this proposal. There are most likely other big changes involved, like communicating this to the upstream operators. *distributed stream deadlocks* We had this issue in the DataSet API. Earlier versions of the DataSet API made an analysis of the flow detecting dams and whether the pipeline breaking behavior in the flow would cause deadlocks, and introduce artificial pipeline breakers in response. The logic was really complicated and it took a while to become stable. We had several issues that certain user functions (like mapPartition) could either be pipelined or have a full dam (not possible to know for the system), so we had to insert artificial pipeline breakers in all paths. In the end we simply decided that in the case of a diamond-style flow, we make the point where the flow first forks as blocking shuffle. That was super simple, solved all issues, and has the additional nice property that it great point to materialize data for recovery, because it helps both paths of the diamond upon failure. My suggestion: ==> For streaming, no problem so far, nothing to do ==> For batch, would suggest to go with the simple solution described above first, and improve when we see cases where this impacts performance significantly *empty input / selection timeout* I can see that being relevant in future streaming cases, for example with side inputs. You want to wait for the side input data, but with a timeout, so the program can still proceed with non-perfect context data in case that context data is very late. Because we do not support side inputs at the moment, we may want to defer this for now. Let's not over-design for problems that are not well understood at this point. *timers* I don't understand the problem with timers. Timers are bound to the operator, not the input, so they should still work if an input ends. There are cases where some state in the operator that is only relevant as long as an input still has data (like in symmetric joins) and the timers are relevant to that state. When the state is dropped, the timers should also be dropped, but that is the operator's logic on "endInput()". So there is no inherent issue between input and timers. Best, Stephan On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <[hidden email]> wrote: > Hi, guys: > I propose a design to enhance Stream Operator API for Batch’s requirements. > This is also the Flink’s goal that Batch is a special case of Streaming. > This > proposal mainly contains two changes to operator api: > > 1. Allow "StreamOperator" can choose which input to read; > 2. Notify "StreamOperator" that an input has ended. > > > This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo Sun > offlline. > It will be great to hear the feed backs and suggestions from the community. > Please kindly share your comments and suggestions. > > Best > GuoWei Ma. > > Enhance Operator API to Support Dynamically Sel... > < > https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web > > > |
2019.2.10
Hi,Stephan Thank you very much for such detailed and constructive comments. *binary vs. n-ary* and *enum vs. integer* Considering the N-ary, as you mentioned, using integers may be a better choice. *generic selectable interface* You are right. This interface can be removed. *end-input* It is true that the Operator does not need to store the end-input state, which can be inferred by the system and notify the Operator at the right time. We can consider using this mechanism when the system can checkpoint the topology with the Finish Tasks. *early-out* It is reasonable for me not to consider this situation at present. *distributed stream deadlocks* At present, there is no deadlock for the streaming, but I think it might be still necessary to do some validation(Warning or Reject) in JobGraph. Because once Flink introduces this TwoInputSelectable interface, the user of the streaming would also construct a diamond-style topology that may be deadlocked. *empty input / selection timeout* It is reasonable for me not to consider this situation at present. *timers* When all the inputs are finished, TimeService will wait until all timers are triggered. So there should be no problem. I and others guys are confirming the details to see if there are other considerations Best GuoWei Stephan Ewen <[hidden email]> 于2019年2月8日周五 下午7:56写道: > Nice design proposal, and +1 to the general idea. > > A few thoughts / suggestions: > > *binary vs. n-ary* > > I would plan ahead for N-ary operators. Not because we necessarily need > n-ary inputs (one can probably build that purely in the API) but because of > future side inputs. The proposal should be able to handle that as well. > > *enum vs. integer* > > The above might be easier is to realize when going directly with integer > and having ANY, FIRST, SECOND, etc. as pre-defined constants. > Performance wise, it is probably not difference whether to use int or enum. > > *generic selectable interface* > > From the proposal, I don't understand quite what that interface is for. My > understanding is that the input processor or task that calls the > operators's functions would anyways work on the TwoInputStreamOperator > interface, for efficiency. > > *end-input* > > I think we should not make storing the end-input the operator's > responsibility > There is a simple way to handle this, which is also consistent with other > aspects of handling finished tasks: > > - If a task is finished, that should be stored in the checkpoint. > - Upon restoring a finished task, if it has still running successors, we > deploy a "finished input channel", which immediately send the "end of > input" when task is started. > - the operator will hence set the end of input immediately again upon > > *early-out* > > Letting nextSelection() return “NONE” or “FINISHED" may be relevant for > early-out cases, but I would remove this from the scope of this proposal. > There are most likely other big changes involved, like communicating this > to the upstream operators. > > *distributed stream deadlocks* > > We had this issue in the DataSet API. Earlier versions of the DataSet API > made an analysis of the flow detecting dams and whether the pipeline > breaking behavior in the flow would cause deadlocks, and introduce > artificial pipeline breakers in response. > > The logic was really complicated and it took a while to become stable. We > had several issues that certain user functions (like mapPartition) could > either be pipelined or have a full dam (not possible to know for the > system), so we had to insert artificial pipeline breakers in all paths. > > In the end we simply decided that in the case of a diamond-style flow, we > make the point where the flow first forks as blocking shuffle. That was > super simple, solved all issues, and has the additional nice property that > it great point to materialize data for recovery, because it helps both > paths of the diamond upon failure. > > My suggestion: > ==> For streaming, no problem so far, nothing to do > ==> For batch, would suggest to go with the simple solution described above > first, and improve when we see cases where this impacts performance > significantly > > *empty input / selection timeout* > > I can see that being relevant in future streaming cases, for example with > side inputs. You want to wait for the side input data, but with a timeout, > so the program can still proceed with non-perfect context data in case that > context data is very late. > > Because we do not support side inputs at the moment, we may want to defer > this for now. Let's not over-design for problems that are not well > understood at this point. > > *timers* > > I don't understand the problem with timers. Timers are bound to the > operator, not the input, so they should still work if an input ends. > There are cases where some state in the operator that is only relevant as > long as an input still has data (like in symmetric joins) and the timers > are relevant to that state. > When the state is dropped, the timers should also be dropped, but that is > the operator's logic on "endInput()". So there is no inherent issue between > input and timers. > > Best, > Stephan > > > On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <[hidden email]> wrote: > > > Hi, guys: > > I propose a design to enhance Stream Operator API for Batch’s > requirements. > > This is also the Flink’s goal that Batch is a special case of Streaming. > > This > > proposal mainly contains two changes to operator api: > > > > 1. Allow "StreamOperator" can choose which input to read; > > 2. Notify "StreamOperator" that an input has ended. > > > > > > This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo > Sun > > offlline. > > It will be great to hear the feed backs and suggestions from the > community. > > Please kindly share your comments and suggestions. > > > > Best > > GuoWei Ma. > > > > Enhance Operator API to Support Dynamically Sel... > > < > > > https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web > > > > > > |
To move this forward, would suggest the following:
- Let's quickly check which other classes need to change. I assume the TwoInputStreamTask and StreamTwoInputProcessor ? - Can those changes be new classes that are used when the new operator is used? The current TwoInputStreamTask and StreamTwoInputProcessor remain until they are fully subsumed and are then removed. - Do we need and other refactorings before, like some cleanup of the Operator Config or the Operator Chain? Best, Stephan On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <[hidden email]> wrote: > 2019.2.10 > > > Hi,Stephan > > > Thank you very much for such detailed and constructive comments. > > > *binary vs. n-ary* and *enum vs. integer* > > > Considering the N-ary, as you mentioned, using integers may be a better > choice. > > > *generic selectable interface* > > > You are right. This interface can be removed. > > > *end-input* > > It is true that the Operator does not need to store the end-input state, > which can be inferred by the system and notify the Operator at the right > time. We can consider using this mechanism when the system can checkpoint > the topology with the Finish Tasks. > > > *early-out* > > It is reasonable for me not to consider this situation at present. > > > *distributed stream deadlocks* > > > At present, there is no deadlock for the streaming, but I think it might > be still necessary to do some validation(Warning or Reject) in JobGraph. > Because once Flink introduces this TwoInputSelectable interface, the user > of the streaming would also construct a diamond-style topology that may be > deadlocked. > > > *empty input / selection timeout* > > It is reasonable for me not to consider this situation at present. > > > *timers* > > When all the inputs are finished, TimeService will wait until all timers > are triggered. So there should be no problem. I and others guys are > confirming the details to see if there are other considerations > > > Best > > GuoWei > > Stephan Ewen <[hidden email]> 于2019年2月8日周五 下午7:56写道: > > > Nice design proposal, and +1 to the general idea. > > > > A few thoughts / suggestions: > > > > *binary vs. n-ary* > > > > I would plan ahead for N-ary operators. Not because we necessarily need > > n-ary inputs (one can probably build that purely in the API) but because > of > > future side inputs. The proposal should be able to handle that as well. > > > > *enum vs. integer* > > > > The above might be easier is to realize when going directly with integer > > and having ANY, FIRST, SECOND, etc. as pre-defined constants. > > Performance wise, it is probably not difference whether to use int or > enum. > > > > *generic selectable interface* > > > > From the proposal, I don't understand quite what that interface is for. > My > > understanding is that the input processor or task that calls the > > operators's functions would anyways work on the TwoInputStreamOperator > > interface, for efficiency. > > > > *end-input* > > > > I think we should not make storing the end-input the operator's > > responsibility > > There is a simple way to handle this, which is also consistent with other > > aspects of handling finished tasks: > > > > - If a task is finished, that should be stored in the checkpoint. > > - Upon restoring a finished task, if it has still running successors, we > > deploy a "finished input channel", which immediately send the "end of > > input" when task is started. > > - the operator will hence set the end of input immediately again upon > > > > *early-out* > > > > Letting nextSelection() return “NONE” or “FINISHED" may be relevant for > > early-out cases, but I would remove this from the scope of this proposal. > > There are most likely other big changes involved, like communicating this > > to the upstream operators. > > > > *distributed stream deadlocks* > > > > We had this issue in the DataSet API. Earlier versions of the DataSet API > > made an analysis of the flow detecting dams and whether the pipeline > > breaking behavior in the flow would cause deadlocks, and introduce > > artificial pipeline breakers in response. > > > > The logic was really complicated and it took a while to become stable. We > > had several issues that certain user functions (like mapPartition) could > > either be pipelined or have a full dam (not possible to know for the > > system), so we had to insert artificial pipeline breakers in all paths. > > > > In the end we simply decided that in the case of a diamond-style flow, we > > make the point where the flow first forks as blocking shuffle. That was > > super simple, solved all issues, and has the additional nice property > that > > it great point to materialize data for recovery, because it helps both > > paths of the diamond upon failure. > > > > My suggestion: > > ==> For streaming, no problem so far, nothing to do > > ==> For batch, would suggest to go with the simple solution described > above > > first, and improve when we see cases where this impacts performance > > significantly > > > > *empty input / selection timeout* > > > > I can see that being relevant in future streaming cases, for example with > > side inputs. You want to wait for the side input data, but with a > timeout, > > so the program can still proceed with non-perfect context data in case > that > > context data is very late. > > > > Because we do not support side inputs at the moment, we may want to defer > > this for now. Let's not over-design for problems that are not well > > understood at this point. > > > > *timers* > > > > I don't understand the problem with timers. Timers are bound to the > > operator, not the input, so they should still work if an input ends. > > There are cases where some state in the operator that is only relevant as > > long as an input still has data (like in symmetric joins) and the timers > > are relevant to that state. > > When the state is dropped, the timers should also be dropped, but that is > > the operator's logic on "endInput()". So there is no inherent issue > between > > input and timers. > > > > Best, > > Stephan > > > > > > On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <[hidden email]> wrote: > > > > > Hi, guys: > > > I propose a design to enhance Stream Operator API for Batch’s > > requirements. > > > This is also the Flink’s goal that Batch is a special case of > Streaming. > > > This > > > proposal mainly contains two changes to operator api: > > > > > > 1. Allow "StreamOperator" can choose which input to read; > > > 2. Notify "StreamOperator" that an input has ended. > > > > > > > > > This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo > > Sun > > > offlline. > > > It will be great to hear the feed backs and suggestions from the > > community. > > > Please kindly share your comments and suggestions. > > > > > > Best > > > GuoWei Ma. > > > > > > Enhance Operator API to Support Dynamically Sel... > > > < > > > > > > https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web > > > > > > > > > > |
While we’re on operators and tasks, I think it would also make sense in the long run to move the logic that is now in AbstractStreamOperator.setup()/initializeState()/snapshot()/snapshotState()(and the other snapshotState()…)/dispose() outside of the operator itself. This logic is the same for every operator but shouldn’t really be in there. We currently have a very complicated dance between the StreamTask and AbstractStreamOperator for initialising the state backends that doesn’t really seem necessary.
> On 14. Feb 2019, at 11:54, Stephan Ewen <[hidden email]> wrote: > > To move this forward, would suggest the following: > > - Let's quickly check which other classes need to change. I assume the > TwoInputStreamTask and StreamTwoInputProcessor ? > - Can those changes be new classes that are used when the new operator is > used? The current TwoInputStreamTask and StreamTwoInputProcessor remain > until they are fully subsumed and are then removed. > > - Do we need and other refactorings before, like some cleanup of the > Operator Config or the Operator Chain? > > Best, > Stephan > > > On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <[hidden email]> wrote: > >> 2019.2.10 >> >> >> Hi,Stephan >> >> >> Thank you very much for such detailed and constructive comments. >> >> >> *binary vs. n-ary* and *enum vs. integer* >> >> >> Considering the N-ary, as you mentioned, using integers may be a better >> choice. >> >> >> *generic selectable interface* >> >> >> You are right. This interface can be removed. >> >> >> *end-input* >> >> It is true that the Operator does not need to store the end-input state, >> which can be inferred by the system and notify the Operator at the right >> time. We can consider using this mechanism when the system can checkpoint >> the topology with the Finish Tasks. >> >> >> *early-out* >> >> It is reasonable for me not to consider this situation at present. >> >> >> *distributed stream deadlocks* >> >> >> At present, there is no deadlock for the streaming, but I think it might >> be still necessary to do some validation(Warning or Reject) in JobGraph. >> Because once Flink introduces this TwoInputSelectable interface, the user >> of the streaming would also construct a diamond-style topology that may be >> deadlocked. >> >> >> *empty input / selection timeout* >> >> It is reasonable for me not to consider this situation at present. >> >> >> *timers* >> >> When all the inputs are finished, TimeService will wait until all timers >> are triggered. So there should be no problem. I and others guys are >> confirming the details to see if there are other considerations >> >> >> Best >> >> GuoWei >> >> Stephan Ewen <[hidden email]> 于2019年2月8日周五 下午7:56写道: >> >>> Nice design proposal, and +1 to the general idea. >>> >>> A few thoughts / suggestions: >>> >>> *binary vs. n-ary* >>> >>> I would plan ahead for N-ary operators. Not because we necessarily need >>> n-ary inputs (one can probably build that purely in the API) but because >> of >>> future side inputs. The proposal should be able to handle that as well. >>> >>> *enum vs. integer* >>> >>> The above might be easier is to realize when going directly with integer >>> and having ANY, FIRST, SECOND, etc. as pre-defined constants. >>> Performance wise, it is probably not difference whether to use int or >> enum. >>> >>> *generic selectable interface* >>> >>> From the proposal, I don't understand quite what that interface is for. >> My >>> understanding is that the input processor or task that calls the >>> operators's functions would anyways work on the TwoInputStreamOperator >>> interface, for efficiency. >>> >>> *end-input* >>> >>> I think we should not make storing the end-input the operator's >>> responsibility >>> There is a simple way to handle this, which is also consistent with other >>> aspects of handling finished tasks: >>> >>> - If a task is finished, that should be stored in the checkpoint. >>> - Upon restoring a finished task, if it has still running successors, we >>> deploy a "finished input channel", which immediately send the "end of >>> input" when task is started. >>> - the operator will hence set the end of input immediately again upon >>> >>> *early-out* >>> >>> Letting nextSelection() return “NONE” or “FINISHED" may be relevant for >>> early-out cases, but I would remove this from the scope of this proposal. >>> There are most likely other big changes involved, like communicating this >>> to the upstream operators. >>> >>> *distributed stream deadlocks* >>> >>> We had this issue in the DataSet API. Earlier versions of the DataSet API >>> made an analysis of the flow detecting dams and whether the pipeline >>> breaking behavior in the flow would cause deadlocks, and introduce >>> artificial pipeline breakers in response. >>> >>> The logic was really complicated and it took a while to become stable. We >>> had several issues that certain user functions (like mapPartition) could >>> either be pipelined or have a full dam (not possible to know for the >>> system), so we had to insert artificial pipeline breakers in all paths. >>> >>> In the end we simply decided that in the case of a diamond-style flow, we >>> make the point where the flow first forks as blocking shuffle. That was >>> super simple, solved all issues, and has the additional nice property >> that >>> it great point to materialize data for recovery, because it helps both >>> paths of the diamond upon failure. >>> >>> My suggestion: >>> ==> For streaming, no problem so far, nothing to do >>> ==> For batch, would suggest to go with the simple solution described >> above >>> first, and improve when we see cases where this impacts performance >>> significantly >>> >>> *empty input / selection timeout* >>> >>> I can see that being relevant in future streaming cases, for example with >>> side inputs. You want to wait for the side input data, but with a >> timeout, >>> so the program can still proceed with non-perfect context data in case >> that >>> context data is very late. >>> >>> Because we do not support side inputs at the moment, we may want to defer >>> this for now. Let's not over-design for problems that are not well >>> understood at this point. >>> >>> *timers* >>> >>> I don't understand the problem with timers. Timers are bound to the >>> operator, not the input, so they should still work if an input ends. >>> There are cases where some state in the operator that is only relevant as >>> long as an input still has data (like in symmetric joins) and the timers >>> are relevant to that state. >>> When the state is dropped, the timers should also be dropped, but that is >>> the operator's logic on "endInput()". So there is no inherent issue >> between >>> input and timers. >>> >>> Best, >>> Stephan >>> >>> >>> On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <[hidden email]> wrote: >>> >>>> Hi, guys: >>>> I propose a design to enhance Stream Operator API for Batch’s >>> requirements. >>>> This is also the Flink’s goal that Batch is a special case of >> Streaming. >>>> This >>>> proposal mainly contains two changes to operator api: >>>> >>>> 1. Allow "StreamOperator" can choose which input to read; >>>> 2. Notify "StreamOperator" that an input has ended. >>>> >>>> >>>> This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo >>> Sun >>>> offlline. >>>> It will be great to hear the feed backs and suggestions from the >>> community. >>>> Please kindly share your comments and suggestions. >>>> >>>> Best >>>> GuoWei Ma. >>>> >>>> Enhance Operator API to Support Dynamically Sel... >>>> < >>>> >>> >> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web >>>>> >>>> >>> >> |
In reply to this post by Stephan Ewen
Which classes need to be changed or use new classes? I'm working on a design that Flink runtime support TwoInputSelectable, and I'll give a initial proposal document next Monday. In the proposal, the core classes that need to be changed include UnionInputGate and StremTwoInputProcessor. I think the discussion that follows can be based on this initial proposal. Do we need other refactoring? From the codes we have implemented this functionality, no other refactors need to be done before it. Best, Haibo At 2019-02-14 18:54:57, "Stephan Ewen" <[hidden email]> wrote: >To move this forward, would suggest the following: > > - Let's quickly check which other classes need to change. I assume the >TwoInputStreamTask and StreamTwoInputProcessor ? > - Can those changes be new classes that are used when the new operator is >used? The current TwoInputStreamTask and StreamTwoInputProcessor remain >until they are fully subsumed and are then removed. > > - Do we need and other refactorings before, like some cleanup of the >Operator Config or the Operator Chain? > >Best, >Stephan > > >On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <[hidden email]> wrote: > >> 2019.2.10 >> >> >> Hi,Stephan >> >> >> Thank you very much for such detailed and constructive comments. >> >> >> *binary vs. n-ary* and *enum vs. integer* >> >> >> Considering the N-ary, as you mentioned, using integers may be a better >> choice. >> >> >> *generic selectable interface* >> >> >> You are right. This interface can be removed. >> >> >> *end-input* >> >> It is true that the Operator does not need to store the end-input state, >> which can be inferred by the system and notify the Operator at the right >> time. We can consider using this mechanism when the system can checkpoint >> the topology with the Finish Tasks. >> >> >> *early-out* >> >> It is reasonable for me not to consider this situation at present. >> >> >> *distributed stream deadlocks* >> >> >> At present, there is no deadlock for the streaming, but I think it might >> be still necessary to do some validation(Warning or Reject) in JobGraph. >> Because once Flink introduces this TwoInputSelectable interface, the user >> of the streaming would also construct a diamond-style topology that may be >> deadlocked. >> >> >> *empty input / selection timeout* >> >> It is reasonable for me not to consider this situation at present. >> >> >> *timers* >> >> When all the inputs are finished, TimeService will wait until all timers >> are triggered. So there should be no problem. I and others guys are >> confirming the details to see if there are other considerations >> >> >> Best >> >> GuoWei >> >> Stephan Ewen <[hidden email]> 于2019年2月8日周五 下午7:56写道: >> >> > Nice design proposal, and +1 to the general idea. >> > >> > A few thoughts / suggestions: >> > >> > *binary vs. n-ary* >> > >> > I would plan ahead for N-ary operators. Not because we necessarily need >> > n-ary inputs (one can probably build that purely in the API) but because >> of >> > future side inputs. The proposal should be able to handle that as well. >> > >> > *enum vs. integer* >> > >> > The above might be easier is to realize when going directly with integer >> > and having ANY, FIRST, SECOND, etc. as pre-defined constants. >> > Performance wise, it is probably not difference whether to use int or >> enum. >> > >> > *generic selectable interface* >> > >> > From the proposal, I don't understand quite what that interface is for. >> My >> > understanding is that the input processor or task that calls the >> > operators's functions would anyways work on the TwoInputStreamOperator >> > interface, for efficiency. >> > >> > *end-input* >> > >> > I think we should not make storing the end-input the operator's >> > responsibility >> > There is a simple way to handle this, which is also consistent with other >> > aspects of handling finished tasks: >> > >> > - If a task is finished, that should be stored in the checkpoint. >> > - Upon restoring a finished task, if it has still running successors, we >> > deploy a "finished input channel", which immediately send the "end of >> > input" when task is started. >> > - the operator will hence set the end of input immediately again upon >> > >> > *early-out* >> > >> > Letting nextSelection() return “NONE” or “FINISHED" may be relevant for >> > early-out cases, but I would remove this from the scope of this proposal. >> > There are most likely other big changes involved, like communicating this >> > to the upstream operators. >> > >> > *distributed stream deadlocks* >> > >> > We had this issue in the DataSet API. Earlier versions of the DataSet API >> > made an analysis of the flow detecting dams and whether the pipeline >> > breaking behavior in the flow would cause deadlocks, and introduce >> > artificial pipeline breakers in response. >> > >> > The logic was really complicated and it took a while to become stable. We >> > had several issues that certain user functions (like mapPartition) could >> > either be pipelined or have a full dam (not possible to know for the >> > system), so we had to insert artificial pipeline breakers in all paths. >> > >> > In the end we simply decided that in the case of a diamond-style flow, we >> > make the point where the flow first forks as blocking shuffle. That was >> > super simple, solved all issues, and has the additional nice property >> that >> > it great point to materialize data for recovery, because it helps both >> > paths of the diamond upon failure. >> > >> > My suggestion: >> > ==> For streaming, no problem so far, nothing to do >> > ==> For batch, would suggest to go with the simple solution described >> above >> > first, and improve when we see cases where this impacts performance >> > significantly >> > >> > *empty input / selection timeout* >> > >> > I can see that being relevant in future streaming cases, for example with >> > side inputs. You want to wait for the side input data, but with a >> timeout, >> > so the program can still proceed with non-perfect context data in case >> that >> > context data is very late. >> > >> > Because we do not support side inputs at the moment, we may want to defer >> > this for now. Let's not over-design for problems that are not well >> > understood at this point. >> > >> > *timers* >> > >> > I don't understand the problem with timers. Timers are bound to the >> > operator, not the input, so they should still work if an input ends. >> > There are cases where some state in the operator that is only relevant as >> > long as an input still has data (like in symmetric joins) and the timers >> > are relevant to that state. >> > When the state is dropped, the timers should also be dropped, but that is >> > the operator's logic on "endInput()". So there is no inherent issue >> between >> > input and timers. >> > >> > Best, >> > Stephan >> > >> > >> > On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <[hidden email]> wrote: >> > >> > > Hi, guys: >> > > I propose a design to enhance Stream Operator API for Batch’s >> > requirements. >> > > This is also the Flink’s goal that Batch is a special case of >> Streaming. >> > > This >> > > proposal mainly contains two changes to operator api: >> > > >> > > 1. Allow "StreamOperator" can choose which input to read; >> > > 2. Notify "StreamOperator" that an input has ended. >> > > >> > > >> > > This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo >> > Sun >> > > offlline. >> > > It will be great to hear the feed backs and suggestions from the >> > community. >> > > Please kindly share your comments and suggestions. >> > > >> > > Best >> > > GuoWei Ma. >> > > >> > > Enhance Operator API to Support Dynamically Sel... >> > > < >> > > >> > >> https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web >> > > > >> > > >> > >> |
In reply to this post by Aljoscha Krettek-2
I agree with you. In the long run, we should clearly define which parts
should be exposed to users. At present, AbstractStreamOperator exposes a lot of concrete implementations, such as the details of the asynchronous Checkpoint "OperatorSnapshotFutures", and how to release some resource that is needed by the implementation. These parts might be transparent to the user. These also lead to rely on Runtime implementation on the client side. In a way, users might not see the AbstractStreamOperator class, users should only see StreamOperator/OneInputStreamOperator/TwoInputStreamOperator Interface. Aljoscha Krettek <[hidden email]> 于2019年2月14日周四 下午11:49写道: > While we’re on operators and tasks, I think it would also make sense in > the long run to move the logic that is now in > AbstractStreamOperator.setup()/initializeState()/snapshot()/snapshotState()(and > the other snapshotState()…)/dispose() outside of the operator itself. This > logic is the same for every operator but shouldn’t really be in there. We > currently have a very complicated dance between the StreamTask and > AbstractStreamOperator for initialising the state backends that doesn’t > really seem necessary. > > > On 14. Feb 2019, at 11:54, Stephan Ewen <[hidden email]> wrote: > > > > To move this forward, would suggest the following: > > > > - Let's quickly check which other classes need to change. I assume the > > TwoInputStreamTask and StreamTwoInputProcessor ? > > - Can those changes be new classes that are used when the new operator > is > > used? The current TwoInputStreamTask and StreamTwoInputProcessor remain > > until they are fully subsumed and are then removed. > > > > - Do we need and other refactorings before, like some cleanup of the > > Operator Config or the Operator Chain? > > > > Best, > > Stephan > > > > > > On Sun, Feb 10, 2019 at 7:25 AM Guowei Ma <[hidden email]> wrote: > > > >> 2019.2.10 > >> > >> > >> Hi,Stephan > >> > >> > >> Thank you very much for such detailed and constructive comments. > >> > >> > >> *binary vs. n-ary* and *enum vs. integer* > >> > >> > >> Considering the N-ary, as you mentioned, using integers may be a better > >> choice. > >> > >> > >> *generic selectable interface* > >> > >> > >> You are right. This interface can be removed. > >> > >> > >> *end-input* > >> > >> It is true that the Operator does not need to store the end-input state, > >> which can be inferred by the system and notify the Operator at the right > >> time. We can consider using this mechanism when the system can > checkpoint > >> the topology with the Finish Tasks. > >> > >> > >> *early-out* > >> > >> It is reasonable for me not to consider this situation at present. > >> > >> > >> *distributed stream deadlocks* > >> > >> > >> At present, there is no deadlock for the streaming, but I think it might > >> be still necessary to do some validation(Warning or Reject) in > JobGraph. > >> Because once Flink introduces this TwoInputSelectable interface, the > user > >> of the streaming would also construct a diamond-style topology that may > be > >> deadlocked. > >> > >> > >> *empty input / selection timeout* > >> > >> It is reasonable for me not to consider this situation at present. > >> > >> > >> *timers* > >> > >> When all the inputs are finished, TimeService will wait until all timers > >> are triggered. So there should be no problem. I and others guys are > >> confirming the details to see if there are other considerations > >> > >> > >> Best > >> > >> GuoWei > >> > >> Stephan Ewen <[hidden email]> 于2019年2月8日周五 下午7:56写道: > >> > >>> Nice design proposal, and +1 to the general idea. > >>> > >>> A few thoughts / suggestions: > >>> > >>> *binary vs. n-ary* > >>> > >>> I would plan ahead for N-ary operators. Not because we necessarily need > >>> n-ary inputs (one can probably build that purely in the API) but > because > >> of > >>> future side inputs. The proposal should be able to handle that as well. > >>> > >>> *enum vs. integer* > >>> > >>> The above might be easier is to realize when going directly with > integer > >>> and having ANY, FIRST, SECOND, etc. as pre-defined constants. > >>> Performance wise, it is probably not difference whether to use int or > >> enum. > >>> > >>> *generic selectable interface* > >>> > >>> From the proposal, I don't understand quite what that interface is for. > >> My > >>> understanding is that the input processor or task that calls the > >>> operators's functions would anyways work on the TwoInputStreamOperator > >>> interface, for efficiency. > >>> > >>> *end-input* > >>> > >>> I think we should not make storing the end-input the operator's > >>> responsibility > >>> There is a simple way to handle this, which is also consistent with > other > >>> aspects of handling finished tasks: > >>> > >>> - If a task is finished, that should be stored in the checkpoint. > >>> - Upon restoring a finished task, if it has still running successors, > we > >>> deploy a "finished input channel", which immediately send the "end of > >>> input" when task is started. > >>> - the operator will hence set the end of input immediately again upon > >>> > >>> *early-out* > >>> > >>> Letting nextSelection() return “NONE” or “FINISHED" may be relevant for > >>> early-out cases, but I would remove this from the scope of this > proposal. > >>> There are most likely other big changes involved, like communicating > this > >>> to the upstream operators. > >>> > >>> *distributed stream deadlocks* > >>> > >>> We had this issue in the DataSet API. Earlier versions of the DataSet > API > >>> made an analysis of the flow detecting dams and whether the pipeline > >>> breaking behavior in the flow would cause deadlocks, and introduce > >>> artificial pipeline breakers in response. > >>> > >>> The logic was really complicated and it took a while to become stable. > We > >>> had several issues that certain user functions (like mapPartition) > could > >>> either be pipelined or have a full dam (not possible to know for the > >>> system), so we had to insert artificial pipeline breakers in all paths. > >>> > >>> In the end we simply decided that in the case of a diamond-style flow, > we > >>> make the point where the flow first forks as blocking shuffle. That was > >>> super simple, solved all issues, and has the additional nice property > >> that > >>> it great point to materialize data for recovery, because it helps both > >>> paths of the diamond upon failure. > >>> > >>> My suggestion: > >>> ==> For streaming, no problem so far, nothing to do > >>> ==> For batch, would suggest to go with the simple solution described > >> above > >>> first, and improve when we see cases where this impacts performance > >>> significantly > >>> > >>> *empty input / selection timeout* > >>> > >>> I can see that being relevant in future streaming cases, for example > with > >>> side inputs. You want to wait for the side input data, but with a > >> timeout, > >>> so the program can still proceed with non-perfect context data in case > >> that > >>> context data is very late. > >>> > >>> Because we do not support side inputs at the moment, we may want to > defer > >>> this for now. Let's not over-design for problems that are not well > >>> understood at this point. > >>> > >>> *timers* > >>> > >>> I don't understand the problem with timers. Timers are bound to the > >>> operator, not the input, so they should still work if an input ends. > >>> There are cases where some state in the operator that is only relevant > as > >>> long as an input still has data (like in symmetric joins) and the > timers > >>> are relevant to that state. > >>> When the state is dropped, the timers should also be dropped, but that > is > >>> the operator's logic on "endInput()". So there is no inherent issue > >> between > >>> input and timers. > >>> > >>> Best, > >>> Stephan > >>> > >>> > >>> On Sat, Feb 2, 2019 at 3:55 AM Guowei Ma <[hidden email]> wrote: > >>> > >>>> Hi, guys: > >>>> I propose a design to enhance Stream Operator API for Batch’s > >>> requirements. > >>>> This is also the Flink’s goal that Batch is a special case of > >> Streaming. > >>>> This > >>>> proposal mainly contains two changes to operator api: > >>>> > >>>> 1. Allow "StreamOperator" can choose which input to read; > >>>> 2. Notify "StreamOperator" that an input has ended. > >>>> > >>>> > >>>> This proposal was discussed with Piotr Nowojski, Kostas Kloudas, Haibo > >>> Sun > >>>> offlline. > >>>> It will be great to hear the feed backs and suggestions from the > >>> community. > >>>> Please kindly share your comments and suggestions. > >>>> > >>>> Best > >>>> GuoWei Ma. > >>>> > >>>> Enhance Operator API to Support Dynamically Sel... > >>>> < > >>>> > >>> > >> > https://docs.google.com/document/d/10k5pQm3SkMiK5Zn1iFDqhQnzjQTLF0Vtcbc8poB4_c8/edit?usp=drive_web > >>>>> > >>>> > >>> > >> > > |
Free forum by Nabble | Edit this page |