Scheduling in BATCH execution mode?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Scheduling in BATCH execution mode?

Niklas Semmler
Hello Flink community,

what is the equivalent of the ScheduleOrUpdateConsumers message in the
pipeline execution mode for the batch execution mode?

When I run a WordCount in pipeline mode, the scheduling of the receiving
tasks is initiated in the ResultPartition class via the function
notifyPipelinedConsumers*. This leads to a  ScheduleOrUpdateConsumers
message being sent to the JobManager and the JobManager takes care of
the rest.

In the batch mode this does not seem to be the case, as the
notifyPipelinedConsumers function will only work in the pipeline
execution mode:

 > if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers)

So then, how is the consumer scheduled, or at least notified of the
consumable partition?

Cheers,
Niklas

*
https://github.com/apache/flink/blob/572a45b379ca2231d772db4f115749fa08afcd10/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java#L416
Reply | Threaded
Open this post in threaded view
|

Re: Scheduling in BATCH execution mode?

Ufuk Celebi-2
Hey Niklas,

this is very much hidden unfortunately. You can find it in Execution#markFinished.

The last partition to be finished triggers the scheduling of the receivers.

From your comments I see that you have dug through the network stack code quite a bit. If you are interested, we can have a chat about refactoring things like the scheduling of the receivers to be more accessible/transparent.

– Ufuk

> On 09 Sep 2015, at 18:06, Niklas Semmler <[hidden email]> wrote:
>
> Hello Flink community,
>
> what is the equivalent of the ScheduleOrUpdateConsumers message in the pipeline execution mode for the batch execution mode?
>
> When I run a WordCount in pipeline mode, the scheduling of the receiving tasks is initiated in the ResultPartition class via the function notifyPipelinedConsumers*. This leads to a  ScheduleOrUpdateConsumers message being sent to the JobManager and the JobManager takes care of the rest.
>
> In the batch mode this does not seem to be the case, as the notifyPipelinedConsumers function will only work in the pipeline execution mode:
>
> > if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers)
>
> So then, how is the consumer scheduled, or at least notified of the consumable partition?
>
> Cheers,
> Niklas
>
> * https://github.com/apache/flink/blob/572a45b379ca2231d772db4f115749fa08afcd10/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java#L416

Reply | Threaded
Open this post in threaded view
|

Re: Scheduling in BATCH execution mode?

Niklas Semmler-2
Hello Ufuk,

thanks for you amazingly quick reply.

I have seen the markFinished in Execution.java, but if I get it right,
this is simply used to stop a task. The ScheduleOrUpdateConsumers
message in the pipeline case on the other hand is notifying the
consumers that a pipelined partition is ready and can now be consumed.
Can you give me a hint on how the receiver is notified of a consumable
partition in the batch case?

And yeah I would be great if we could have a chat :).

Best,
Niklas

On 09.09.2015 18:11, Ufuk Celebi wrote:

> Hey Niklas,
>
> this is very much hidden unfortunately. You can find it in Execution#markFinished.
>
> The last partition to be finished triggers the scheduling of the receivers.
>
>  From your comments I see that you have dug through the network stack code quite a bit. If you are interested, we can have a chat about refactoring things like the scheduling of the receivers to be more accessible/transparent.
>
> – Ufuk
>
>> On 09 Sep 2015, at 18:06, Niklas Semmler <[hidden email]> wrote:
>>
>> Hello Flink community,
>>
>> what is the equivalent of the ScheduleOrUpdateConsumers message in the pipeline execution mode for the batch execution mode?
>>
>> When I run a WordCount in pipeline mode, the scheduling of the receiving tasks is initiated in the ResultPartition class via the function notifyPipelinedConsumers*. This leads to a  ScheduleOrUpdateConsumers message being sent to the JobManager and the JobManager takes care of the rest.
>>
>> In the batch mode this does not seem to be the case, as the notifyPipelinedConsumers function will only work in the pipeline execution mode:
>>
>>> if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers)
>>
>> So then, how is the consumer scheduled, or at least notified of the consumable partition?
>>
>> Cheers,
>> Niklas
>>
>> * https://github.com/apache/flink/blob/572a45b379ca2231d772db4f115749fa08afcd10/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java#L416
>
Reply | Threaded
Open this post in threaded view
|

Re: Scheduling in BATCH execution mode?

Ufuk Celebi-2

> On 09 Sep 2015, at 19:31, Niklas Semmler <[hidden email]> wrote:
>
> Hello Ufuk,
>
> thanks for you amazingly quick reply.
>
> I have seen the markFinished in Execution.java, but if I get it right, this is simply used to stop a task. The ScheduleOrUpdateConsumers message in the pipeline case on the other hand is notifying the consumers that a pipelined partition is ready and can now be consumed. Can you give me a hint on how the receiver is notified of a consumable partition in the batch case?

Yes, this transitions the state of the respective execution to FINISHED. But if you look closely there is a call “finishAllBlockingPartitions”, which schedules the receivers, iff it the last execution finishes the  result.

You can think about the intermediate data as follows:

- On the job graph level you have: (Operator) -> [Result] -> (Operator), e.g. (map) -> [map result] -> (reduce)

- At runtime when there are multiple parallel tasks, you have:
  * [Operator subtask 0] -> [Result partition 0] -> (Operator subtask 0)
  * [Operator subtask 1] -> [Result partition 1] -> (Operator subtask 1)

Now, if the exchange is blocking (in batch execution mode), the result is finished, iff all subtasks producing it have finished.

The question now is: when is the result finished? It is finished, iff all subtasks producing it have finished. And that’s where the markFinished comes into play. The last to finish triggers the scheduling. This is non-deterministic, i.e. either subtask 0 or 1 can be the last to finish it. This is kept track of via a simple counter. The task decrementing it to 0 triggers the scheduling.

(If the result is pipelined, the first data point triggers the scheduling already via the code paths you have looked at.)

Does this help?

– Ufuk

Reply | Threaded
Open this post in threaded view
|

Re: Scheduling in BATCH execution mode?

Niklas Semmler-2
Hello Ufuk,

thanks that makes very much sense :).

Best,
Niklas

On 09.09.2015 23:22, Ufuk Celebi wrote:

>
>> On 09 Sep 2015, at 19:31, Niklas Semmler <[hidden email]> wrote:
>>
>> Hello Ufuk,
>>
>> thanks for you amazingly quick reply.
>>
>> I have seen the markFinished in Execution.java, but if I get it right, this is simply used to stop a task. The ScheduleOrUpdateConsumers message in the pipeline case on the other hand is notifying the consumers that a pipelined partition is ready and can now be consumed. Can you give me a hint on how the receiver is notified of a consumable partition in the batch case?
>
> Yes, this transitions the state of the respective execution to FINISHED. But if you look closely there is a call “finishAllBlockingPartitions”, which schedules the receivers, iff it the last execution finishes the  result.
>
> You can think about the intermediate data as follows:
>
> - On the job graph level you have: (Operator) -> [Result] -> (Operator), e.g. (map) -> [map result] -> (reduce)
>
> - At runtime when there are multiple parallel tasks, you have:
>    * [Operator subtask 0] -> [Result partition 0] -> (Operator subtask 0)
>    * [Operator subtask 1] -> [Result partition 1] -> (Operator subtask 1)
>
> Now, if the exchange is blocking (in batch execution mode), the result is finished, iff all subtasks producing it have finished.
>
> The question now is: when is the result finished? It is finished, iff all subtasks producing it have finished. And that’s where the markFinished comes into play. The last to finish triggers the scheduling. This is non-deterministic, i.e. either subtask 0 or 1 can be the last to finish it. This is kept track of via a simple counter. The task decrementing it to 0 triggers the scheduling.
>
> (If the result is pipelined, the first data point triggers the scheduling already via the code paths you have looked at.)
>
> Does this help?
>
> – Ufuk
>

--
PhD Student / Research Assistant
INET, TU Berlin
Room 4.029
Marchstr 23
10587 Berlin
Tel: +49 30 314 75739