Turn lazy operator execution off for streaming jobs

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Turn lazy operator execution off for streaming jobs

Gyula Fóra-2
Hey Guys,

I think it would make sense to turn lazy operator execution off for
streaming programs because it would make life simpler for windowing.  I
also created a JIRA issue here
<https://issues.apache.org/jira/browse/FLINK-1425>.

Can anyone give me some quick pointers how to do this? Its probably simple,
I am just not familiar with that part of the code. (Or maybe its so easy
that someone could pick this up :) )

By the way, do you see any reasons why we should not do this?

Thank you!
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Turn lazy operator execution off for streaming jobs

Ufuk Celebi-2
Hey Gyula,

On 21 Jan 2015, at 15:41, Gyula Fóra <[hidden email]> wrote:

> Hey Guys,
>
> I think it would make sense to turn lazy operator execution off for
> streaming programs because it would make life simpler for windowing.  I
> also created a JIRA issue here
> <https://issues.apache.org/jira/browse/FLINK-1425>.
>
> Can anyone give me some quick pointers how to do this? Its probably simple,
> I am just not familiar with that part of the code. (Or maybe its so easy
> that someone could pick this up :) )

Have a look at the JobManager ScheduleOrUpdateConsumers message, which is how it is done currently. The first produced buffer of an intermediate results triggers this message. I think the cleanest solution would be to do this directly when scheduling a streaming job?

> By the way, do you see any reasons why we should not do this?

ATM, I don't.
Reply | Threaded
Open this post in threaded view
|

Re: Turn lazy operator execution off for streaming jobs

Gyula Fóra-2
Thank you! I will play around with it.

On Wed, Jan 21, 2015 at 3:50 PM, Ufuk Celebi <[hidden email]> wrote:

> Hey Gyula,
>
> On 21 Jan 2015, at 15:41, Gyula Fóra <[hidden email]> wrote:
>
> > Hey Guys,
> >
> > I think it would make sense to turn lazy operator execution off for
> > streaming programs because it would make life simpler for windowing.  I
> > also created a JIRA issue here
> > <https://issues.apache.org/jira/browse/FLINK-1425>.
> >
> > Can anyone give me some quick pointers how to do this? Its probably
> simple,
> > I am just not familiar with that part of the code. (Or maybe its so easy
> > that someone could pick this up :) )
>
> Have a look at the JobManager ScheduleOrUpdateConsumers message, which is
> how it is done currently. The first produced buffer of an intermediate
> results triggers this message. I think the cleanest solution would be to do
> this directly when scheduling a streaming job?
>
> > By the way, do you see any reasons why we should not do this?
>
> ATM, I don't.
Reply | Threaded
Open this post in threaded view
|

Re: Turn lazy operator execution off for streaming jobs

Stephan Ewen
In reply to this post by Ufuk Celebi-2
I think that this is a fairly delicate thing.

The execution graph / scheduling is the most delicate part of the system. I
would not feel too well about a quick fix there, so let's think this
through a little bit.

The logic currently does the following:

1) It schedules the sources (see "ExecutionGaph.scheduleForExecution()")

2) Successors of operators are scheduled when the intermediate result
partition / queue tells the master that data is available.

3) The successor requests the stream from the producer.


Possible changes:
 - We could definitely change the "ExecutionGraph.scheduleForExecution()"
method to deploy all tasks immediately. I would suggest to have a "schedule
mode" attached to the JobGraph that defines how to do that. The mode could
have values (FROM_SOURCES, ALL, BACK_TRACKING). From sources is what we do
right now, backtracking is what we will do in the next release, ALL is what
you need)

 - When all tasks are scheduled immediately, it may be that for a channel,
the sender is not yet deployed when the receiver is deployed. That should
be okay, since the same can happen right now when all-to-all patterns
connect the tasks.

 - The queues would still send notifications to the JobManager that data is
available, but the JM will see that the target task is already deployed (or
currently being deployed). Then the info where to grab a channel from would
need to be sent to the task. That mechanism also exists already.


@Ufuk: It seems that it may actually work to simply kick off the deployment
of all tasks immediately (in the ExecutionGraph.scheduleForExecution()"
method). Do you see any other implications?

Greetings,
Stephan


On Wed, Jan 21, 2015 at 6:50 AM, Ufuk Celebi <[hidden email]> wrote:

> Hey Gyula,
>
> On 21 Jan 2015, at 15:41, Gyula Fóra <[hidden email]> wrote:
>
> > Hey Guys,
> >
> > I think it would make sense to turn lazy operator execution off for
> > streaming programs because it would make life simpler for windowing.  I
> > also created a JIRA issue here
> > <https://issues.apache.org/jira/browse/FLINK-1425>.
> >
> > Can anyone give me some quick pointers how to do this? Its probably
> simple,
> > I am just not familiar with that part of the code. (Or maybe its so easy
> > that someone could pick this up :) )
>
> Have a look at the JobManager ScheduleOrUpdateConsumers message, which is
> how it is done currently. The first produced buffer of an intermediate
> results triggers this message. I think the cleanest solution would be to do
> this directly when scheduling a streaming job?
>
> > By the way, do you see any reasons why we should not do this?
>
> ATM, I don't.
Reply | Threaded
Open this post in threaded view
|

Re: Turn lazy operator execution off for streaming jobs

Till Rohrmann
I'm not sure whether it is currently possible to schedule first the
receiver and then the sender. Recently, I had to fix the
TaskManagerTest.testRunWithForwardChannel test case where this was exactly
the case. Due to first scheduling the receiver, it happened sometimes that
an IllegalQueueIteratorRequestException in the method
IntermediateResultPartitionManager.getIntermediateResultPartitionIterator
was thrown. The partition manager complained that the producer execution ID
was unknown. I assume that this has to be fixed first in order to schedule
all task immediately. But Ufuk will probably know it better.

Greets,

Till

On Wed, Jan 21, 2015 at 8:58 PM, Stephan Ewen <[hidden email]> wrote:

> I think that this is a fairly delicate thing.
>
> The execution graph / scheduling is the most delicate part of the system. I
> would not feel too well about a quick fix there, so let's think this
> through a little bit.
>
> The logic currently does the following:
>
> 1) It schedules the sources (see "ExecutionGaph.scheduleForExecution()")
>
> 2) Successors of operators are scheduled when the intermediate result
> partition / queue tells the master that data is available.
>
> 3) The successor requests the stream from the producer.
>
>
> Possible changes:
>  - We could definitely change the "ExecutionGraph.scheduleForExecution()"
> method to deploy all tasks immediately. I would suggest to have a "schedule
> mode" attached to the JobGraph that defines how to do that. The mode could
> have values (FROM_SOURCES, ALL, BACK_TRACKING). From sources is what we do
> right now, backtracking is what we will do in the next release, ALL is what
> you need)
>
>  - When all tasks are scheduled immediately, it may be that for a channel,
> the sender is not yet deployed when the receiver is deployed. That should
> be okay, since the same can happen right now when all-to-all patterns
> connect the tasks.
>
>  - The queues would still send notifications to the JobManager that data is
> available, but the JM will see that the target task is already deployed (or
> currently being deployed). Then the info where to grab a channel from would
> need to be sent to the task. That mechanism also exists already.
>
>
> @Ufuk: It seems that it may actually work to simply kick off the deployment
> of all tasks immediately (in the ExecutionGraph.scheduleForExecution()"
> method). Do you see any other implications?
>
> Greetings,
> Stephan
>
>
> On Wed, Jan 21, 2015 at 6:50 AM, Ufuk Celebi <[hidden email]> wrote:
>
> > Hey Gyula,
> >
> > On 21 Jan 2015, at 15:41, Gyula Fóra <[hidden email]> wrote:
> >
> > > Hey Guys,
> > >
> > > I think it would make sense to turn lazy operator execution off for
> > > streaming programs because it would make life simpler for windowing.  I
> > > also created a JIRA issue here
> > > <https://issues.apache.org/jira/browse/FLINK-1425>.
> > >
> > > Can anyone give me some quick pointers how to do this? Its probably
> > simple,
> > > I am just not familiar with that part of the code. (Or maybe its so
> easy
> > > that someone could pick this up :) )
> >
> > Have a look at the JobManager ScheduleOrUpdateConsumers message, which is
> > how it is done currently. The first produced buffer of an intermediate
> > results triggers this message. I think the cleanest solution would be to
> do
> > this directly when scheduling a streaming job?
> >
> > > By the way, do you see any reasons why we should not do this?
> >
> > ATM, I don't.
>
Reply | Threaded
Open this post in threaded view
|

Re: Turn lazy operator execution off for streaming jobs

Ufuk Celebi-2
In reply to this post by Stephan Ewen
On 22 Jan 2015, at 11:37, Till Rohrmann <[hidden email]> wrote:

> I'm not sure whether it is currently possible to schedule first the
> receiver and then the sender. Recently, I had to fix the
> TaskManagerTest.testRunWithForwardChannel test case where this was exactly
> the case. Due to first scheduling the receiver, it happened sometimes that
> an IllegalQueueIteratorRequestException in the method
> IntermediateResultPartitionManager.getIntermediateResultPartitionIterator
> was thrown. The partition manager complained that the producer execution ID
> was unknown. I assume that this has to be fixed first in order to schedule
> all task immediately. But Ufuk will probably know it better.

On 21 Jan 2015, at 20:58, Stephan Ewen <[hidden email]> wrote:

> - The queues would still send notifications to the JobManager that data is
> available, but the JM will see that the target task is already deployed (or
> currently being deployed). Then the info where to grab a channel from would
> need to be sent to the task. That mechanism also exists already.

The only minor thing that needs to be adjusted would be this mechanism. It is indeed in place already (e.g. UNKNOWN input channels are updated at runtime to LOCAL or REMOTE input channels depending on the producer location), but currently the consumer tasks assume that the consumed intermediate result partition has already been created when they (the consumer task) are deployed and request the partition. When we schedule all tasks at once, we might end up in situations like the test case Till described, where we know that it is a LOCAL or REMOTE channel, but the intermediate result has not been created yet and the request fails.

tl;dr: channels can be updated at runtime, but requests need to arrive after the producer created the partition.
Reply | Threaded
Open this post in threaded view
|

Re: Turn lazy operator execution off for streaming jobs

Stephan Ewen
So the crux is that the JobManager has a location for the sender task (and
tell that to the receivers) before the senders have registered their
transfer queues.

Can we just establish a "happens-before" there?
 - The TaskManager may send the "ack" to the deployment call only after all
queues are registered (might even be like this now)
 - The job manager updates receivers only with locations of senders that
have switched to "running", not with ones that are in "deploying.

Would that fix it?


On Thu, Jan 22, 2015 at 2:51 AM, Ufuk Celebi <[hidden email]> wrote:

> On 22 Jan 2015, at 11:37, Till Rohrmann <[hidden email]> wrote:
>
> > I'm not sure whether it is currently possible to schedule first the
> > receiver and then the sender. Recently, I had to fix the
> > TaskManagerTest.testRunWithForwardChannel test case where this was
> exactly
> > the case. Due to first scheduling the receiver, it happened sometimes
> that
> > an IllegalQueueIteratorRequestException in the method
> > IntermediateResultPartitionManager.getIntermediateResultPartitionIterator
> > was thrown. The partition manager complained that the producer execution
> ID
> > was unknown. I assume that this has to be fixed first in order to
> schedule
> > all task immediately. But Ufuk will probably know it better.
>
> On 21 Jan 2015, at 20:58, Stephan Ewen <[hidden email]> wrote:
>
> > - The queues would still send notifications to the JobManager that data
> is
> > available, but the JM will see that the target task is already deployed
> (or
> > currently being deployed). Then the info where to grab a channel from
> would
> > need to be sent to the task. That mechanism also exists already.
>
> The only minor thing that needs to be adjusted would be this mechanism. It
> is indeed in place already (e.g. UNKNOWN input channels are updated at
> runtime to LOCAL or REMOTE input channels depending on the producer
> location), but currently the consumer tasks assume that the consumed
> intermediate result partition has already been created when they (the
> consumer task) are deployed and request the partition. When we schedule all
> tasks at once, we might end up in situations like the test case Till
> described, where we know that it is a LOCAL or REMOTE channel, but the
> intermediate result has not been created yet and the request fails.
>
> tl;dr: channels can be updated at runtime, but requests need to arrive
> after the producer created the partition.
Reply | Threaded
Open this post in threaded view
|

Re: Turn lazy operator execution off for streaming jobs

Ufuk Celebi-2
On 22 Jan 2015, at 18:10, Stephan Ewen <[hidden email]> wrote:

> So the crux is that the JobManager has a location for the sender task (and
> tell that to the receivers) before the senders have registered their
> transfer queues.
>
> Can we just establish a "happens-before" there?
> - The TaskManager may send the "ack" to the deployment call only after all
> queues are registered (might even be like this now)
> - The job manager updates receivers only with locations of senders that
> have switched to "running", not with ones that are in "deploying.
>
> Would that fix it?

I've just looked into it and it turns out that I had already implemented this in the current master. As long as the producer has not changed to RUNNING, the consumer will be scheduled with an UNKNOWN input channel. The problem that Till mentioned was specific to the test case where it was hard coded to a local input channel.

@Gyula: I've implemented the schedule mode as suggested by Stephan here: https://github.com/uce/incubator-flink/tree/schedule-all

You can set the schedule mode with

jobGraph.setScheduleMode(ScheduleMode.ALL)

I have just tested it with a small test case in JobMangerITCase. If you have time, you could try it out with some of the streaming programs. If this is what you need, I can open a PR later. :)
Reply | Threaded
Open this post in threaded view
|

Re: Turn lazy operator execution off for streaming jobs

Gyula Fóra-2
Great, thanks! It was fast :)

I will try to check it out in the afternoon!

Gyula

On Fri, Jan 23, 2015 at 11:37 AM, Ufuk Celebi <[hidden email]> wrote:

> On 22 Jan 2015, at 18:10, Stephan Ewen <[hidden email]> wrote:
>
> > So the crux is that the JobManager has a location for the sender task
> (and
> > tell that to the receivers) before the senders have registered their
> > transfer queues.
> >
> > Can we just establish a "happens-before" there?
> > - The TaskManager may send the "ack" to the deployment call only after
> all
> > queues are registered (might even be like this now)
> > - The job manager updates receivers only with locations of senders that
> > have switched to "running", not with ones that are in "deploying.
> >
> > Would that fix it?
>
> I've just looked into it and it turns out that I had already implemented
> this in the current master. As long as the producer has not changed to
> RUNNING, the consumer will be scheduled with an UNKNOWN input channel. The
> problem that Till mentioned was specific to the test case where it was hard
> coded to a local input channel.
>
> @Gyula: I've implemented the schedule mode as suggested by Stephan here:
> https://github.com/uce/incubator-flink/tree/schedule-all
>
> You can set the schedule mode with
>
> jobGraph.setScheduleMode(ScheduleMode.ALL)
>
> I have just tested it with a small test case in JobMangerITCase. If you
> have time, you could try it out with some of the streaming programs. If
> this is what you need, I can open a PR later. :)
Reply | Threaded
Open this post in threaded view
|

Re: Turn lazy operator execution off for streaming jobs

Gyula Fóra-2
Hey,

I checked it with our examples, it seems to working properly and exactly
how we wanted :).

One minor thing, we have a jira open for this so if you could put that in
the commit message that would be good.
https://issues.apache.org/jira/browse/FLINK-1425

Thanks!
Gyula


On Fri, Jan 23, 2015 at 11:47 AM, Gyula Fóra <[hidden email]> wrote:

> Great, thanks! It was fast :)
>
> I will try to check it out in the afternoon!
>
> Gyula
>
> On Fri, Jan 23, 2015 at 11:37 AM, Ufuk Celebi <[hidden email]> wrote:
>
>> On 22 Jan 2015, at 18:10, Stephan Ewen <[hidden email]> wrote:
>>
>> > So the crux is that the JobManager has a location for the sender task
>> (and
>> > tell that to the receivers) before the senders have registered their
>> > transfer queues.
>> >
>> > Can we just establish a "happens-before" there?
>> > - The TaskManager may send the "ack" to the deployment call only after
>> all
>> > queues are registered (might even be like this now)
>> > - The job manager updates receivers only with locations of senders that
>> > have switched to "running", not with ones that are in "deploying.
>> >
>> > Would that fix it?
>>
>> I've just looked into it and it turns out that I had already implemented
>> this in the current master. As long as the producer has not changed to
>> RUNNING, the consumer will be scheduled with an UNKNOWN input channel. The
>> problem that Till mentioned was specific to the test case where it was hard
>> coded to a local input channel.
>>
>> @Gyula: I've implemented the schedule mode as suggested by Stephan here:
>> https://github.com/uce/incubator-flink/tree/schedule-all
>>
>> You can set the schedule mode with
>>
>> jobGraph.setScheduleMode(ScheduleMode.ALL)
>>
>> I have just tested it with a small test case in JobMangerITCase. If you
>> have time, you could try it out with some of the streaming programs. If
>> this is what you need, I can open a PR later. :)
>
>
>