Hey,
The Flink scheduling mechanism has become quite a bit of a pain lately for us when trying to schedule IO heavy streaming jobs. And by IO heavy I mean it has a fairly large state that is being continuously updated/read. The main problem is that the scheduled task slots are not evenly distributed among the different task managers but usually the first TM takes as much slots as possibles and the other TMs get much fewer. And since the job is RocksDB IO bound the uneven load causes a significant performance penalty. This is further accentuated during historical runs when we are trying to "fast-forward" the application. The difference can be quite substantial in a 3-4 node cluster: with even task distribution the history might run 3 times faster compared to an uneven one. I was wondering if there was a simple way to modify the scheduler so it allocates resources in a round-robin fashion. Probably someone has a lot of experience with this already :) (I'm running 1.0.3 for this job btw) Cheers, Gyula |
FLINK-1003 may be related.
On 13.06.2016 12:46, Gyula Fóra wrote: > Hey, > > The Flink scheduling mechanism has become quite a bit of a pain lately for > us when trying to schedule IO heavy streaming jobs. And by IO heavy I mean > it has a fairly large state that is being continuously updated/read. > > The main problem is that the scheduled task slots are not evenly > distributed among the different task managers but usually the first TM > takes as much slots as possibles and the other TMs get much fewer. And > since the job is RocksDB IO bound the uneven load causes a significant > performance penalty. > > This is further accentuated during historical runs when we are trying to > "fast-forward" the application. The difference can be quite substantial in > a 3-4 node cluster: with even task distribution the history might run 3 > times faster compared to an uneven one. > > I was wondering if there was a simple way to modify the scheduler so it > allocates resources in a round-robin fashion. Probably someone has a lot of > experience with this already :) (I'm running 1.0.3 for this job btw) > > Cheers, > Gyula > |
Thanks! I found this PR already but seemed to be completely outdated :)
Maybe it's worth restarting this discussion. Gyula Chesnay Schepler <[hidden email]> ezt írta (időpont: 2016. jún. 13., H, 14:58): > FLINK-1003 may be related. > > On 13.06.2016 12:46, Gyula Fóra wrote: > > Hey, > > > > The Flink scheduling mechanism has become quite a bit of a pain lately > for > > us when trying to schedule IO heavy streaming jobs. And by IO heavy I > mean > > it has a fairly large state that is being continuously updated/read. > > > > The main problem is that the scheduled task slots are not evenly > > distributed among the different task managers but usually the first TM > > takes as much slots as possibles and the other TMs get much fewer. And > > since the job is RocksDB IO bound the uneven load causes a significant > > performance penalty. > > > > This is further accentuated during historical runs when we are trying to > > "fast-forward" the application. The difference can be quite substantial > in > > a 3-4 node cluster: with even task distribution the history might run 3 > > times faster compared to an uneven one. > > > > I was wondering if there was a simple way to modify the scheduler so it > > allocates resources in a round-robin fashion. Probably someone has a lot > of > > experience with this already :) (I'm running 1.0.3 for this job btw) > > > > Cheers, > > Gyula > > > > |
Hi Gyula,
the scheduler actually deploys independent tasks in a round-robin fashion across the cluster. So for example, your source sub tasks should be spread evenly. However, whenever a sub-task has an input, it tries to deploy this task on the same machine as one of the input sub-tasks (preferred locations). If you have an n-to-m communication scheme, then this means that all downstream sub-tasks depend on the same set of of upstream sub-tasks. The task manager of the first upstream sub-task which has some free slots left is selected for the next down-stream sub-task. This is the point where the clustered deployment happens, because we simply take the first TaskManager instead of checking that we evenly spread the load across all TaskManagers which execute upstream sub-tasks. I think that it should not be a big change to add a mode where we spread the load across all TaskManager which are in the set of preferred locations. The changes should go to the findInstance method in Scheduler.java:463. Do you want to take the lead for this feature? Cheers, Till On Mon, Jun 13, 2016 at 3:11 PM, Gyula Fóra <[hidden email]> wrote: > Thanks! I found this PR already but seemed to be completely outdated :) > > Maybe it's worth restarting this discussion. > > Gyula > > Chesnay Schepler <[hidden email]> ezt írta (időpont: 2016. jún. 13., > H, > 14:58): > > > FLINK-1003 may be related. > > > > On 13.06.2016 12:46, Gyula Fóra wrote: > > > Hey, > > > > > > The Flink scheduling mechanism has become quite a bit of a pain lately > > for > > > us when trying to schedule IO heavy streaming jobs. And by IO heavy I > > mean > > > it has a fairly large state that is being continuously updated/read. > > > > > > The main problem is that the scheduled task slots are not evenly > > > distributed among the different task managers but usually the first TM > > > takes as much slots as possibles and the other TMs get much fewer. And > > > since the job is RocksDB IO bound the uneven load causes a significant > > > performance penalty. > > > > > > This is further accentuated during historical runs when we are trying > to > > > "fast-forward" the application. The difference can be quite substantial > > in > > > a 3-4 node cluster: with even task distribution the history might run 3 > > > times faster compared to an uneven one. > > > > > > I was wondering if there was a simple way to modify the scheduler so it > > > allocates resources in a round-robin fashion. Probably someone has a > lot > > of > > > experience with this already :) (I'm running 1.0.3 for this job btw) > > > > > > Cheers, > > > Gyula > > > > > > > > |
Hi Till,
Thanks for the pointers, I started looking into this and it does not seem to be too complicated add a new strategy :) I will try to put together a PR but I would greatly appreciate if you could check it out once I'm done as I don't have too much experience with these components. Cheers, Gyula Till Rohrmann <[hidden email]> ezt írta (időpont: 2016. jún. 17., P, 11:56): > Hi Gyula, > > the scheduler actually deploys independent tasks in a round-robin fashion > across the cluster. So for example, your source sub tasks should be spread > evenly. However, whenever a sub-task has an input, it tries to deploy this > task on the same machine as one of the input sub-tasks (preferred > locations). If you have an n-to-m communication scheme, then this means > that all downstream sub-tasks depend on the same set of of upstream > sub-tasks. The task manager of the first upstream sub-task which has some > free slots left is selected for the next down-stream sub-task. > > This is the point where the clustered deployment happens, because we simply > take the first TaskManager instead of checking that we evenly spread the > load across all TaskManagers which execute upstream sub-tasks. I think that > it should not be a big change to add a mode where we spread the load across > all TaskManager which are in the set of preferred locations. The changes > should go to the findInstance method in Scheduler.java:463. > > Do you want to take the lead for this feature? > > Cheers, > Till > > On Mon, Jun 13, 2016 at 3:11 PM, Gyula Fóra <[hidden email]> wrote: > > > Thanks! I found this PR already but seemed to be completely outdated :) > > > > Maybe it's worth restarting this discussion. > > > > Gyula > > > > Chesnay Schepler <[hidden email]> ezt írta (időpont: 2016. jún. 13., > > H, > > 14:58): > > > > > FLINK-1003 may be related. > > > > > > On 13.06.2016 12:46, Gyula Fóra wrote: > > > > Hey, > > > > > > > > The Flink scheduling mechanism has become quite a bit of a pain > lately > > > for > > > > us when trying to schedule IO heavy streaming jobs. And by IO heavy I > > > mean > > > > it has a fairly large state that is being continuously updated/read. > > > > > > > > The main problem is that the scheduled task slots are not evenly > > > > distributed among the different task managers but usually the first > TM > > > > takes as much slots as possibles and the other TMs get much fewer. > And > > > > since the job is RocksDB IO bound the uneven load causes a > significant > > > > performance penalty. > > > > > > > > This is further accentuated during historical runs when we are trying > > to > > > > "fast-forward" the application. The difference can be quite > substantial > > > in > > > > a 3-4 node cluster: with even task distribution the history might > run 3 > > > > times faster compared to an uneven one. > > > > > > > > I was wondering if there was a simple way to modify the scheduler so > it > > > > allocates resources in a round-robin fashion. Probably someone has a > > lot > > > of > > > > experience with this already :) (I'm running 1.0.3 for this job btw) > > > > > > > > Cheers, > > > > Gyula > > > > > > > > > > > > > |
I have opened a PR <https://github.com/apache/flink/pull/2129> with my
initial implementation, it seems to work well on the cluster at least for fairly simple jobs (haven't tried with iterations). It would be great to get some feedback before I start diving into the scheduler tests :) But no need to hurry... Gyula Gyula Fóra <[hidden email]> ezt írta (időpont: 2016. jún. 17., P, 16:27): > Hi Till, > > Thanks for the pointers, I started looking into this and it does not seem > to be too complicated add a new strategy :) > > I will try to put together a PR but I would greatly appreciate if you > could check it out once I'm done as I don't have too much experience with > these components. > > Cheers, > Gyula > > Till Rohrmann <[hidden email]> ezt írta (időpont: 2016. jún. 17., > P, 11:56): > >> Hi Gyula, >> >> the scheduler actually deploys independent tasks in a round-robin fashion >> across the cluster. So for example, your source sub tasks should be spread >> evenly. However, whenever a sub-task has an input, it tries to deploy this >> task on the same machine as one of the input sub-tasks (preferred >> locations). If you have an n-to-m communication scheme, then this means >> that all downstream sub-tasks depend on the same set of of upstream >> sub-tasks. The task manager of the first upstream sub-task which has some >> free slots left is selected for the next down-stream sub-task. >> >> This is the point where the clustered deployment happens, because we >> simply >> take the first TaskManager instead of checking that we evenly spread the >> load across all TaskManagers which execute upstream sub-tasks. I think >> that >> it should not be a big change to add a mode where we spread the load >> across >> all TaskManager which are in the set of preferred locations. The changes >> should go to the findInstance method in Scheduler.java:463. >> >> Do you want to take the lead for this feature? >> >> Cheers, >> Till >> >> On Mon, Jun 13, 2016 at 3:11 PM, Gyula Fóra <[hidden email]> wrote: >> >> > Thanks! I found this PR already but seemed to be completely outdated :) >> > >> > Maybe it's worth restarting this discussion. >> > >> > Gyula >> > >> > Chesnay Schepler <[hidden email]> ezt írta (időpont: 2016. jún. >> 13., >> > H, >> > 14:58): >> > >> > > FLINK-1003 may be related. >> > > >> > > On 13.06.2016 12:46, Gyula Fóra wrote: >> > > > Hey, >> > > > >> > > > The Flink scheduling mechanism has become quite a bit of a pain >> lately >> > > for >> > > > us when trying to schedule IO heavy streaming jobs. And by IO heavy >> I >> > > mean >> > > > it has a fairly large state that is being continuously updated/read. >> > > > >> > > > The main problem is that the scheduled task slots are not evenly >> > > > distributed among the different task managers but usually the first >> TM >> > > > takes as much slots as possibles and the other TMs get much fewer. >> And >> > > > since the job is RocksDB IO bound the uneven load causes a >> significant >> > > > performance penalty. >> > > > >> > > > This is further accentuated during historical runs when we are >> trying >> > to >> > > > "fast-forward" the application. The difference can be quite >> substantial >> > > in >> > > > a 3-4 node cluster: with even task distribution the history might >> run 3 >> > > > times faster compared to an uneven one. >> > > > >> > > > I was wondering if there was a simple way to modify the scheduler >> so it >> > > > allocates resources in a round-robin fashion. Probably someone has a >> > lot >> > > of >> > > > experience with this already :) (I'm running 1.0.3 for this job btw) >> > > > >> > > > Cheers, >> > > > Gyula >> > > > >> > > >> > > >> > >> > |
Free forum by Nabble | Edit this page |