Hello! Do you know how to modify the task scheduling method of Flink?
|
Hi Penguin,
What do you wanna do? If you want to change Flink's scheduling behaviour, then you can take a look at the implementations of SchedulerNG. Cheers, Till On Wed, Jan 6, 2021 at 6:58 AM penguin. <[hidden email]> wrote: > Hello! Do you know how to modify the task scheduling method of Flink? |
Hi Till,
Thank you for your reply. I found such a chain of method calls: JobMaster#startScheduling -> SchedulerBase#startScheduling -> DefaultScheduler#startSchedulingInternal -> EagerSchedulingStrategy#startScheduling -> EagerSchedulingStrategy#allocateSlotsAndDeploy -> DefaultScheduler#allocateSlotsAndDeploy . (The version of Flink I use is Flink-1.11.1) I'm going to try to see the following code first, but the code annotation seem to be very few.I feel very difficult for my goal and I hope I can get your help later. Sincerely, Penguin At 2021-01-06 17:08:05, "Till Rohrmann" <[hidden email]> wrote: >Hi Penguin, > >What do you wanna do? If you want to change Flink's scheduling behaviour, >then you can take a look at the implementations of SchedulerNG. > >Cheers, >Till > >On Wed, Jan 6, 2021 at 6:58 AM penguin. <[hidden email]> wrote: > >> Hello! Do you know how to modify the task scheduling method of Flink? |
Hi Penguin,
Jumping into this conversation since I worked on the same code base (Flink-1.11.2) for a recent project and might have a more fresh memory of the method calls. I believe the chain of methods you have highlighted is correct. Obviously, the chain would differ based on the configuration and context. This is a stack trace that I derived for a scheduling call for a simple Word Count streaming application. selectBestSlotForProfile:46, LocationPreferenceSlotSelectionStrategy (org.apache.flink.runtime.jobmaster.slotpool) tryAllocateFromAvailable:275, SchedulerImpl (org.apache.flink.runtime.jobmaster.slotpool) allocateMultiTaskSlot:470, SchedulerImpl (org.apache.flink.runtime.jobmaster.slotpool) allocateSharedSlot:311, SchedulerImpl (org.apache.flink.runtime.jobmaster.slotpool) internalAllocateSlot:160, SchedulerImpl (org.apache.flink.runtime.jobmaster.slotpool) allocateSlotInternal:143, SchedulerImpl (org.apache.flink.runtime.jobmaster.slotpool) allocateSlot:113, SchedulerImpl (org.apache.flink.runtime.jobmaster.slotpool) allocateSlot:115, SlotProviderStrategy$NormalSlotProviderStrategy (org.apache.flink.runtime.executiongraph) lambda$allocateSlotsFor$0:104, DefaultExecutionSlotAllocator (org.apache.flink.runtime.scheduler) apply:-1, 90274328 (org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator$$Lambda$679) uniComposeStage:995, CompletableFuture (java.util.concurrent) thenCompose:2137, CompletableFuture (java.util.concurrent) allocateSlotsFor:102, DefaultExecutionSlotAllocator (org.apache.flink.runtime.scheduler) allocateSlots:339, DefaultScheduler (org.apache.flink.runtime.scheduler) allocateSlotsAndDeploy:312, DefaultScheduler (org.apache.flink.runtime.scheduler) allocateSlotsAndDeploy:76, EagerSchedulingStrategy (org.apache.flink.runtime.scheduler.strategy) startScheduling:52, EagerSchedulingStrategy (org.apache.flink.runtime.scheduler.strategy) startSchedulingInternal:173, DefaultScheduler (org.apache.flink.runtime.scheduler) startScheduling:461, SchedulerBase (org.apache.flink.runtime.scheduler) startScheduling:897, JobMaster (org.apache.flink.runtime.jobmaster) It might be useful to remote debug a local setup to get the exact stack trace/method-call-chain for your scenario. Navigating the code through method calls to figure out the execution sequence can be a bit tricky since Flink uses a lot of asynchronous calls. According to my understanding, the basic flow for scheduling goes like this: It initially calls DefaultScheduler#startScheduling, and goes through an internal call to the preset scheduling strategy. The scheduling strategy is set at the time of creation of the DefaultScheduler class. The scheduling strategy basically populates some deployment option configurations (though it can be used to do more fine-grained scheduling changes), then tells the DefaultScheduler to allocate the slots and deploy. It then goes through another set of calls that basically allocates slots, selects a slot for each execution vertex (while also considering slot selection strategies). This information is then sent to the TaskManager through an RPC call (through another set chain of calls), and the task information is used by the task manager to deploy the tasks according to the schedule. Hope this helps. Also hope someone from the community will correct if I have stated something incorrect. Best, Lasantha On Wed, 6 Jan 2021 at 21:01, penguin. <[hidden email]> wrote: > Hi Till, > > Thank you for your reply. I found such a chain of method calls: > > > JobMaster#startScheduling -> SchedulerBase#startScheduling -> > DefaultScheduler#startSchedulingInternal -> > EagerSchedulingStrategy#startScheduling -> > EagerSchedulingStrategy#allocateSlotsAndDeploy -> > DefaultScheduler#allocateSlotsAndDeploy . > (The version of Flink I use is Flink-1.11.1) > > > I'm going to try to see the following code first, but the code annotation > seem to be very few.I feel very difficult for my goal and I hope I can get > your help later. > > > Sincerely, > Penguin > > > > > > > > > > > > > > > > > > At 2021-01-06 17:08:05, "Till Rohrmann" <[hidden email]> wrote: > >Hi Penguin, > > > >What do you wanna do? If you want to change Flink's scheduling behaviour, > >then you can take a look at the implementations of SchedulerNG. > > > >Cheers, > >Till > > > >On Wed, Jan 6, 2021 at 6:58 AM penguin. <[hidden email]> wrote: > > > >> Hello! Do you know how to modify the task scheduling method of Flink? > |
Hi Lasantha,
Thanks for your reply.I also found the method call chain you said. Next, I'm going to study these codes to see if I can modify them to schedule tasks to a slot in the specified taskManager. But I think it's a very difficult thing for me to read the source code, especially in the absence of comments. It would be a great pleasure if more people come to study and discuss this topic. Best, Penguin At 2021-01-08 06:58:54, "Lasantha Fernando" <[hidden email]> wrote: >Hi Penguin, > >Jumping into this conversation since I worked on the same code base >(Flink-1.11.2) for a recent project and might have a more fresh memory of >the method calls. > >I believe the chain of methods you have highlighted is correct. Obviously, >the chain would differ based on the configuration and context. This is a >stack trace that I derived for a scheduling call for a simple Word Count >streaming application. > >selectBestSlotForProfile:46, LocationPreferenceSlotSelectionStrategy >(org.apache.flink.runtime.jobmaster.slotpool) >tryAllocateFromAvailable:275, SchedulerImpl >(org.apache.flink.runtime.jobmaster.slotpool) >allocateMultiTaskSlot:470, SchedulerImpl >(org.apache.flink.runtime.jobmaster.slotpool) >allocateSharedSlot:311, SchedulerImpl >(org.apache.flink.runtime.jobmaster.slotpool) >internalAllocateSlot:160, SchedulerImpl >(org.apache.flink.runtime.jobmaster.slotpool) >allocateSlotInternal:143, SchedulerImpl >(org.apache.flink.runtime.jobmaster.slotpool) >allocateSlot:113, SchedulerImpl >(org.apache.flink.runtime.jobmaster.slotpool) >allocateSlot:115, SlotProviderStrategy$NormalSlotProviderStrategy >(org.apache.flink.runtime.executiongraph) >lambda$allocateSlotsFor$0:104, DefaultExecutionSlotAllocator >(org.apache.flink.runtime.scheduler) >apply:-1, 90274328 >(org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator$$Lambda$679) >uniComposeStage:995, CompletableFuture (java.util.concurrent) >thenCompose:2137, CompletableFuture (java.util.concurrent) >allocateSlotsFor:102, DefaultExecutionSlotAllocator >(org.apache.flink.runtime.scheduler) >allocateSlots:339, DefaultScheduler (org.apache.flink.runtime.scheduler) >allocateSlotsAndDeploy:312, DefaultScheduler >(org.apache.flink.runtime.scheduler) >allocateSlotsAndDeploy:76, EagerSchedulingStrategy >(org.apache.flink.runtime.scheduler.strategy) >startScheduling:52, EagerSchedulingStrategy >(org.apache.flink.runtime.scheduler.strategy) >startSchedulingInternal:173, DefaultScheduler >(org.apache.flink.runtime.scheduler) >startScheduling:461, SchedulerBase (org.apache.flink.runtime.scheduler) >startScheduling:897, JobMaster (org.apache.flink.runtime.jobmaster) > >It might be useful to remote debug a local setup to get the exact stack >trace/method-call-chain for your scenario. Navigating the code through >method calls to figure out the execution sequence can be a bit tricky since >Flink uses a lot of asynchronous calls. > >According to my understanding, the basic flow for scheduling goes like >this: It initially calls DefaultScheduler#startScheduling, and goes through >an internal call to the preset scheduling strategy. The scheduling strategy >is set at the time of creation of the DefaultScheduler class. The >scheduling strategy basically populates some deployment option >configurations (though it can be used to do more fine-grained scheduling >changes), then tells the DefaultScheduler to allocate the slots and deploy. >It then goes through another set of calls that basically allocates slots, >selects a slot for each execution vertex (while also considering slot >selection strategies). > >This information is then sent to the TaskManager through an RPC call >(through another set chain of calls), and the task information is used by >the task manager to deploy the tasks according to the schedule. > >Hope this helps. Also hope someone from the community will correct if I >have stated something incorrect. > >Best, > >Lasantha > >On Wed, 6 Jan 2021 at 21:01, penguin. <[hidden email]> wrote: > >> Hi Till, >> >> Thank you for your reply. I found such a chain of method calls: >> >> >> JobMaster#startScheduling -> SchedulerBase#startScheduling -> >> DefaultScheduler#startSchedulingInternal -> >> EagerSchedulingStrategy#startScheduling -> >> EagerSchedulingStrategy#allocateSlotsAndDeploy -> >> DefaultScheduler#allocateSlotsAndDeploy . >> (The version of Flink I use is Flink-1.11.1) >> >> >> I'm going to try to see the following code first, but the code annotation >> seem to be very few.I feel very difficult for my goal and I hope I can get >> your help later. >> >> >> Sincerely, >> Penguin >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> At 2021-01-06 17:08:05, "Till Rohrmann" <[hidden email]> wrote: >> >Hi Penguin, >> > >> >What do you wanna do? If you want to change Flink's scheduling behaviour, >> >then you can take a look at the implementations of SchedulerNG. >> > >> >Cheers, >> >Till >> > >> >On Wed, Jan 6, 2021 at 6:58 AM penguin. <[hidden email]> wrote: >> > >> >> Hello! Do you know how to modify the task scheduling method of Flink? >> |
Free forum by Nabble | Edit this page |