Hello Flink community,
what is the equivalent of the ScheduleOrUpdateConsumers message in the pipeline execution mode for the batch execution mode? When I run a WordCount in pipeline mode, the scheduling of the receiving tasks is initiated in the ResultPartition class via the function notifyPipelinedConsumers*. This leads to a ScheduleOrUpdateConsumers message being sent to the JobManager and the JobManager takes care of the rest. In the batch mode this does not seem to be the case, as the notifyPipelinedConsumers function will only work in the pipeline execution mode: > if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) So then, how is the consumer scheduled, or at least notified of the consumable partition? Cheers, Niklas * https://github.com/apache/flink/blob/572a45b379ca2231d772db4f115749fa08afcd10/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java#L416 |
Hey Niklas,
this is very much hidden unfortunately. You can find it in Execution#markFinished. The last partition to be finished triggers the scheduling of the receivers. From your comments I see that you have dug through the network stack code quite a bit. If you are interested, we can have a chat about refactoring things like the scheduling of the receivers to be more accessible/transparent. – Ufuk > On 09 Sep 2015, at 18:06, Niklas Semmler <[hidden email]> wrote: > > Hello Flink community, > > what is the equivalent of the ScheduleOrUpdateConsumers message in the pipeline execution mode for the batch execution mode? > > When I run a WordCount in pipeline mode, the scheduling of the receiving tasks is initiated in the ResultPartition class via the function notifyPipelinedConsumers*. This leads to a ScheduleOrUpdateConsumers message being sent to the JobManager and the JobManager takes care of the rest. > > In the batch mode this does not seem to be the case, as the notifyPipelinedConsumers function will only work in the pipeline execution mode: > > > if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) > > So then, how is the consumer scheduled, or at least notified of the consumable partition? > > Cheers, > Niklas > > * https://github.com/apache/flink/blob/572a45b379ca2231d772db4f115749fa08afcd10/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java#L416 |
Hello Ufuk,
thanks for you amazingly quick reply. I have seen the markFinished in Execution.java, but if I get it right, this is simply used to stop a task. The ScheduleOrUpdateConsumers message in the pipeline case on the other hand is notifying the consumers that a pipelined partition is ready and can now be consumed. Can you give me a hint on how the receiver is notified of a consumable partition in the batch case? And yeah I would be great if we could have a chat :). Best, Niklas On 09.09.2015 18:11, Ufuk Celebi wrote: > Hey Niklas, > > this is very much hidden unfortunately. You can find it in Execution#markFinished. > > The last partition to be finished triggers the scheduling of the receivers. > > From your comments I see that you have dug through the network stack code quite a bit. If you are interested, we can have a chat about refactoring things like the scheduling of the receivers to be more accessible/transparent. > > – Ufuk > >> On 09 Sep 2015, at 18:06, Niklas Semmler <[hidden email]> wrote: >> >> Hello Flink community, >> >> what is the equivalent of the ScheduleOrUpdateConsumers message in the pipeline execution mode for the batch execution mode? >> >> When I run a WordCount in pipeline mode, the scheduling of the receiving tasks is initiated in the ResultPartition class via the function notifyPipelinedConsumers*. This leads to a ScheduleOrUpdateConsumers message being sent to the JobManager and the JobManager takes care of the rest. >> >> In the batch mode this does not seem to be the case, as the notifyPipelinedConsumers function will only work in the pipeline execution mode: >> >>> if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) >> >> So then, how is the consumer scheduled, or at least notified of the consumable partition? >> >> Cheers, >> Niklas >> >> * https://github.com/apache/flink/blob/572a45b379ca2231d772db4f115749fa08afcd10/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java#L416 > |
> On 09 Sep 2015, at 19:31, Niklas Semmler <[hidden email]> wrote: > > Hello Ufuk, > > thanks for you amazingly quick reply. > > I have seen the markFinished in Execution.java, but if I get it right, this is simply used to stop a task. The ScheduleOrUpdateConsumers message in the pipeline case on the other hand is notifying the consumers that a pipelined partition is ready and can now be consumed. Can you give me a hint on how the receiver is notified of a consumable partition in the batch case? Yes, this transitions the state of the respective execution to FINISHED. But if you look closely there is a call “finishAllBlockingPartitions”, which schedules the receivers, iff it the last execution finishes the result. You can think about the intermediate data as follows: - On the job graph level you have: (Operator) -> [Result] -> (Operator), e.g. (map) -> [map result] -> (reduce) - At runtime when there are multiple parallel tasks, you have: * [Operator subtask 0] -> [Result partition 0] -> (Operator subtask 0) * [Operator subtask 1] -> [Result partition 1] -> (Operator subtask 1) Now, if the exchange is blocking (in batch execution mode), the result is finished, iff all subtasks producing it have finished. The question now is: when is the result finished? It is finished, iff all subtasks producing it have finished. And that’s where the markFinished comes into play. The last to finish triggers the scheduling. This is non-deterministic, i.e. either subtask 0 or 1 can be the last to finish it. This is kept track of via a simple counter. The task decrementing it to 0 triggers the scheduling. (If the result is pipelined, the first data point triggers the scheduling already via the code paths you have looked at.) Does this help? – Ufuk |
Hello Ufuk,
thanks that makes very much sense :). Best, Niklas On 09.09.2015 23:22, Ufuk Celebi wrote: > >> On 09 Sep 2015, at 19:31, Niklas Semmler <[hidden email]> wrote: >> >> Hello Ufuk, >> >> thanks for you amazingly quick reply. >> >> I have seen the markFinished in Execution.java, but if I get it right, this is simply used to stop a task. The ScheduleOrUpdateConsumers message in the pipeline case on the other hand is notifying the consumers that a pipelined partition is ready and can now be consumed. Can you give me a hint on how the receiver is notified of a consumable partition in the batch case? > > Yes, this transitions the state of the respective execution to FINISHED. But if you look closely there is a call “finishAllBlockingPartitions”, which schedules the receivers, iff it the last execution finishes the result. > > You can think about the intermediate data as follows: > > - On the job graph level you have: (Operator) -> [Result] -> (Operator), e.g. (map) -> [map result] -> (reduce) > > - At runtime when there are multiple parallel tasks, you have: > * [Operator subtask 0] -> [Result partition 0] -> (Operator subtask 0) > * [Operator subtask 1] -> [Result partition 1] -> (Operator subtask 1) > > Now, if the exchange is blocking (in batch execution mode), the result is finished, iff all subtasks producing it have finished. > > The question now is: when is the result finished? It is finished, iff all subtasks producing it have finished. And that’s where the markFinished comes into play. The last to finish triggers the scheduling. This is non-deterministic, i.e. either subtask 0 or 1 can be the last to finish it. This is kept track of via a simple counter. The task decrementing it to 0 triggers the scheduling. > > (If the result is pipelined, the first data point triggers the scheduling already via the code paths you have looked at.) > > Does this help? > > – Ufuk > -- PhD Student / Research Assistant INET, TU Berlin Room 4.029 Marchstr 23 10587 Berlin Tel: +49 30 314 75739 |
Free forum by Nabble | Edit this page |