Hey Guys,
I think it would make sense to turn lazy operator execution off for streaming programs because it would make life simpler for windowing. I also created a JIRA issue here <https://issues.apache.org/jira/browse/FLINK-1425>. Can anyone give me some quick pointers how to do this? Its probably simple, I am just not familiar with that part of the code. (Or maybe its so easy that someone could pick this up :) ) By the way, do you see any reasons why we should not do this? Thank you! Gyula |
Hey Gyula,
On 21 Jan 2015, at 15:41, Gyula Fóra <[hidden email]> wrote: > Hey Guys, > > I think it would make sense to turn lazy operator execution off for > streaming programs because it would make life simpler for windowing. I > also created a JIRA issue here > <https://issues.apache.org/jira/browse/FLINK-1425>. > > Can anyone give me some quick pointers how to do this? Its probably simple, > I am just not familiar with that part of the code. (Or maybe its so easy > that someone could pick this up :) ) Have a look at the JobManager ScheduleOrUpdateConsumers message, which is how it is done currently. The first produced buffer of an intermediate results triggers this message. I think the cleanest solution would be to do this directly when scheduling a streaming job? > By the way, do you see any reasons why we should not do this? ATM, I don't. |
Thank you! I will play around with it.
On Wed, Jan 21, 2015 at 3:50 PM, Ufuk Celebi <[hidden email]> wrote: > Hey Gyula, > > On 21 Jan 2015, at 15:41, Gyula Fóra <[hidden email]> wrote: > > > Hey Guys, > > > > I think it would make sense to turn lazy operator execution off for > > streaming programs because it would make life simpler for windowing. I > > also created a JIRA issue here > > <https://issues.apache.org/jira/browse/FLINK-1425>. > > > > Can anyone give me some quick pointers how to do this? Its probably > simple, > > I am just not familiar with that part of the code. (Or maybe its so easy > > that someone could pick this up :) ) > > Have a look at the JobManager ScheduleOrUpdateConsumers message, which is > how it is done currently. The first produced buffer of an intermediate > results triggers this message. I think the cleanest solution would be to do > this directly when scheduling a streaming job? > > > By the way, do you see any reasons why we should not do this? > > ATM, I don't. |
In reply to this post by Ufuk Celebi-2
I think that this is a fairly delicate thing.
The execution graph / scheduling is the most delicate part of the system. I would not feel too well about a quick fix there, so let's think this through a little bit. The logic currently does the following: 1) It schedules the sources (see "ExecutionGaph.scheduleForExecution()") 2) Successors of operators are scheduled when the intermediate result partition / queue tells the master that data is available. 3) The successor requests the stream from the producer. Possible changes: - We could definitely change the "ExecutionGraph.scheduleForExecution()" method to deploy all tasks immediately. I would suggest to have a "schedule mode" attached to the JobGraph that defines how to do that. The mode could have values (FROM_SOURCES, ALL, BACK_TRACKING). From sources is what we do right now, backtracking is what we will do in the next release, ALL is what you need) - When all tasks are scheduled immediately, it may be that for a channel, the sender is not yet deployed when the receiver is deployed. That should be okay, since the same can happen right now when all-to-all patterns connect the tasks. - The queues would still send notifications to the JobManager that data is available, but the JM will see that the target task is already deployed (or currently being deployed). Then the info where to grab a channel from would need to be sent to the task. That mechanism also exists already. @Ufuk: It seems that it may actually work to simply kick off the deployment of all tasks immediately (in the ExecutionGraph.scheduleForExecution()" method). Do you see any other implications? Greetings, Stephan On Wed, Jan 21, 2015 at 6:50 AM, Ufuk Celebi <[hidden email]> wrote: > Hey Gyula, > > On 21 Jan 2015, at 15:41, Gyula Fóra <[hidden email]> wrote: > > > Hey Guys, > > > > I think it would make sense to turn lazy operator execution off for > > streaming programs because it would make life simpler for windowing. I > > also created a JIRA issue here > > <https://issues.apache.org/jira/browse/FLINK-1425>. > > > > Can anyone give me some quick pointers how to do this? Its probably > simple, > > I am just not familiar with that part of the code. (Or maybe its so easy > > that someone could pick this up :) ) > > Have a look at the JobManager ScheduleOrUpdateConsumers message, which is > how it is done currently. The first produced buffer of an intermediate > results triggers this message. I think the cleanest solution would be to do > this directly when scheduling a streaming job? > > > By the way, do you see any reasons why we should not do this? > > ATM, I don't. |
I'm not sure whether it is currently possible to schedule first the
receiver and then the sender. Recently, I had to fix the TaskManagerTest.testRunWithForwardChannel test case where this was exactly the case. Due to first scheduling the receiver, it happened sometimes that an IllegalQueueIteratorRequestException in the method IntermediateResultPartitionManager.getIntermediateResultPartitionIterator was thrown. The partition manager complained that the producer execution ID was unknown. I assume that this has to be fixed first in order to schedule all task immediately. But Ufuk will probably know it better. Greets, Till On Wed, Jan 21, 2015 at 8:58 PM, Stephan Ewen <[hidden email]> wrote: > I think that this is a fairly delicate thing. > > The execution graph / scheduling is the most delicate part of the system. I > would not feel too well about a quick fix there, so let's think this > through a little bit. > > The logic currently does the following: > > 1) It schedules the sources (see "ExecutionGaph.scheduleForExecution()") > > 2) Successors of operators are scheduled when the intermediate result > partition / queue tells the master that data is available. > > 3) The successor requests the stream from the producer. > > > Possible changes: > - We could definitely change the "ExecutionGraph.scheduleForExecution()" > method to deploy all tasks immediately. I would suggest to have a "schedule > mode" attached to the JobGraph that defines how to do that. The mode could > have values (FROM_SOURCES, ALL, BACK_TRACKING). From sources is what we do > right now, backtracking is what we will do in the next release, ALL is what > you need) > > - When all tasks are scheduled immediately, it may be that for a channel, > the sender is not yet deployed when the receiver is deployed. That should > be okay, since the same can happen right now when all-to-all patterns > connect the tasks. > > - The queues would still send notifications to the JobManager that data is > available, but the JM will see that the target task is already deployed (or > currently being deployed). Then the info where to grab a channel from would > need to be sent to the task. That mechanism also exists already. > > > @Ufuk: It seems that it may actually work to simply kick off the deployment > of all tasks immediately (in the ExecutionGraph.scheduleForExecution()" > method). Do you see any other implications? > > Greetings, > Stephan > > > On Wed, Jan 21, 2015 at 6:50 AM, Ufuk Celebi <[hidden email]> wrote: > > > Hey Gyula, > > > > On 21 Jan 2015, at 15:41, Gyula Fóra <[hidden email]> wrote: > > > > > Hey Guys, > > > > > > I think it would make sense to turn lazy operator execution off for > > > streaming programs because it would make life simpler for windowing. I > > > also created a JIRA issue here > > > <https://issues.apache.org/jira/browse/FLINK-1425>. > > > > > > Can anyone give me some quick pointers how to do this? Its probably > > simple, > > > I am just not familiar with that part of the code. (Or maybe its so > easy > > > that someone could pick this up :) ) > > > > Have a look at the JobManager ScheduleOrUpdateConsumers message, which is > > how it is done currently. The first produced buffer of an intermediate > > results triggers this message. I think the cleanest solution would be to > do > > this directly when scheduling a streaming job? > > > > > By the way, do you see any reasons why we should not do this? > > > > ATM, I don't. > |
In reply to this post by Stephan Ewen
On 22 Jan 2015, at 11:37, Till Rohrmann <[hidden email]> wrote:
> I'm not sure whether it is currently possible to schedule first the > receiver and then the sender. Recently, I had to fix the > TaskManagerTest.testRunWithForwardChannel test case where this was exactly > the case. Due to first scheduling the receiver, it happened sometimes that > an IllegalQueueIteratorRequestException in the method > IntermediateResultPartitionManager.getIntermediateResultPartitionIterator > was thrown. The partition manager complained that the producer execution ID > was unknown. I assume that this has to be fixed first in order to schedule > all task immediately. But Ufuk will probably know it better. On 21 Jan 2015, at 20:58, Stephan Ewen <[hidden email]> wrote: > - The queues would still send notifications to the JobManager that data is > available, but the JM will see that the target task is already deployed (or > currently being deployed). Then the info where to grab a channel from would > need to be sent to the task. That mechanism also exists already. The only minor thing that needs to be adjusted would be this mechanism. It is indeed in place already (e.g. UNKNOWN input channels are updated at runtime to LOCAL or REMOTE input channels depending on the producer location), but currently the consumer tasks assume that the consumed intermediate result partition has already been created when they (the consumer task) are deployed and request the partition. When we schedule all tasks at once, we might end up in situations like the test case Till described, where we know that it is a LOCAL or REMOTE channel, but the intermediate result has not been created yet and the request fails. tl;dr: channels can be updated at runtime, but requests need to arrive after the producer created the partition. |
So the crux is that the JobManager has a location for the sender task (and
tell that to the receivers) before the senders have registered their transfer queues. Can we just establish a "happens-before" there? - The TaskManager may send the "ack" to the deployment call only after all queues are registered (might even be like this now) - The job manager updates receivers only with locations of senders that have switched to "running", not with ones that are in "deploying. Would that fix it? On Thu, Jan 22, 2015 at 2:51 AM, Ufuk Celebi <[hidden email]> wrote: > On 22 Jan 2015, at 11:37, Till Rohrmann <[hidden email]> wrote: > > > I'm not sure whether it is currently possible to schedule first the > > receiver and then the sender. Recently, I had to fix the > > TaskManagerTest.testRunWithForwardChannel test case where this was > exactly > > the case. Due to first scheduling the receiver, it happened sometimes > that > > an IllegalQueueIteratorRequestException in the method > > IntermediateResultPartitionManager.getIntermediateResultPartitionIterator > > was thrown. The partition manager complained that the producer execution > ID > > was unknown. I assume that this has to be fixed first in order to > schedule > > all task immediately. But Ufuk will probably know it better. > > On 21 Jan 2015, at 20:58, Stephan Ewen <[hidden email]> wrote: > > > - The queues would still send notifications to the JobManager that data > is > > available, but the JM will see that the target task is already deployed > (or > > currently being deployed). Then the info where to grab a channel from > would > > need to be sent to the task. That mechanism also exists already. > > The only minor thing that needs to be adjusted would be this mechanism. It > is indeed in place already (e.g. UNKNOWN input channels are updated at > runtime to LOCAL or REMOTE input channels depending on the producer > location), but currently the consumer tasks assume that the consumed > intermediate result partition has already been created when they (the > consumer task) are deployed and request the partition. When we schedule all > tasks at once, we might end up in situations like the test case Till > described, where we know that it is a LOCAL or REMOTE channel, but the > intermediate result has not been created yet and the request fails. > > tl;dr: channels can be updated at runtime, but requests need to arrive > after the producer created the partition. |
On 22 Jan 2015, at 18:10, Stephan Ewen <[hidden email]> wrote:
> So the crux is that the JobManager has a location for the sender task (and > tell that to the receivers) before the senders have registered their > transfer queues. > > Can we just establish a "happens-before" there? > - The TaskManager may send the "ack" to the deployment call only after all > queues are registered (might even be like this now) > - The job manager updates receivers only with locations of senders that > have switched to "running", not with ones that are in "deploying. > > Would that fix it? I've just looked into it and it turns out that I had already implemented this in the current master. As long as the producer has not changed to RUNNING, the consumer will be scheduled with an UNKNOWN input channel. The problem that Till mentioned was specific to the test case where it was hard coded to a local input channel. @Gyula: I've implemented the schedule mode as suggested by Stephan here: https://github.com/uce/incubator-flink/tree/schedule-all You can set the schedule mode with jobGraph.setScheduleMode(ScheduleMode.ALL) I have just tested it with a small test case in JobMangerITCase. If you have time, you could try it out with some of the streaming programs. If this is what you need, I can open a PR later. :) |
Great, thanks! It was fast :)
I will try to check it out in the afternoon! Gyula On Fri, Jan 23, 2015 at 11:37 AM, Ufuk Celebi <[hidden email]> wrote: > On 22 Jan 2015, at 18:10, Stephan Ewen <[hidden email]> wrote: > > > So the crux is that the JobManager has a location for the sender task > (and > > tell that to the receivers) before the senders have registered their > > transfer queues. > > > > Can we just establish a "happens-before" there? > > - The TaskManager may send the "ack" to the deployment call only after > all > > queues are registered (might even be like this now) > > - The job manager updates receivers only with locations of senders that > > have switched to "running", not with ones that are in "deploying. > > > > Would that fix it? > > I've just looked into it and it turns out that I had already implemented > this in the current master. As long as the producer has not changed to > RUNNING, the consumer will be scheduled with an UNKNOWN input channel. The > problem that Till mentioned was specific to the test case where it was hard > coded to a local input channel. > > @Gyula: I've implemented the schedule mode as suggested by Stephan here: > https://github.com/uce/incubator-flink/tree/schedule-all > > You can set the schedule mode with > > jobGraph.setScheduleMode(ScheduleMode.ALL) > > I have just tested it with a small test case in JobMangerITCase. If you > have time, you could try it out with some of the streaming programs. If > this is what you need, I can open a PR later. :) |
Hey,
I checked it with our examples, it seems to working properly and exactly how we wanted :). One minor thing, we have a jira open for this so if you could put that in the commit message that would be good. https://issues.apache.org/jira/browse/FLINK-1425 Thanks! Gyula On Fri, Jan 23, 2015 at 11:47 AM, Gyula Fóra <[hidden email]> wrote: > Great, thanks! It was fast :) > > I will try to check it out in the afternoon! > > Gyula > > On Fri, Jan 23, 2015 at 11:37 AM, Ufuk Celebi <[hidden email]> wrote: > >> On 22 Jan 2015, at 18:10, Stephan Ewen <[hidden email]> wrote: >> >> > So the crux is that the JobManager has a location for the sender task >> (and >> > tell that to the receivers) before the senders have registered their >> > transfer queues. >> > >> > Can we just establish a "happens-before" there? >> > - The TaskManager may send the "ack" to the deployment call only after >> all >> > queues are registered (might even be like this now) >> > - The job manager updates receivers only with locations of senders that >> > have switched to "running", not with ones that are in "deploying. >> > >> > Would that fix it? >> >> I've just looked into it and it turns out that I had already implemented >> this in the current master. As long as the producer has not changed to >> RUNNING, the consumer will be scheduled with an UNKNOWN input channel. The >> problem that Till mentioned was specific to the test case where it was hard >> coded to a local input channel. >> >> @Gyula: I've implemented the schedule mode as suggested by Stephan here: >> https://github.com/uce/incubator-flink/tree/schedule-all >> >> You can set the schedule mode with >> >> jobGraph.setScheduleMode(ScheduleMode.ALL) >> >> I have just tested it with a small test case in JobMangerITCase. If you >> have time, you could try it out with some of the streaming programs. If >> this is what you need, I can open a PR later. :) > > > |
Free forum by Nabble | Edit this page |