Hi devs,
Currently, Flink adopts a hash-style blocking shuffle implementation which writes data sent to different reducer tasks into separate files concurrently. Compared to sort-merge based approach writes those data together into a single file and merges those small files into bigger ones, hash-based approach has several weak points when it comes to running large scale batch jobs: 1. *Stability*: For high parallelism (tens of thousands) batch job, current hash-based blocking shuffle implementation writes too many files concurrently which gives high pressure to the file system, for example, maintenance of too many file metas, exhaustion of inodes or file descriptors. All of these can be potential stability issues. Sort-Merge based blocking shuffle don’t have the problem because for one result partition, only one file is written at the same time. 2. *Performance*: Large amounts of small shuffle files and random IO can influence shuffle performance a lot especially for hdd (for ssd, sequential read is also important because of read ahead and cache). For batch jobs processing massive data, small amount of data per subpartition is common because of high parallelism. Besides, data skew is another cause of small subpartition files. By merging data of all subpartitions together in one file, more sequential read can be achieved. 3. *Resource*: For current hash-based implementation, each subpartition needs at least one buffer. For large scale batch shuffles, the memory consumption can be huge. For example, we need at least 320M network memory per result partition if parallelism is set to 10000 and because of the huge network consumption, it is hard to config the network memory for large scale batch job and sometimes parallelism can not be increased just because of insufficient network memory which leads to bad user experience. To improve Flink’s capability of running large scale batch jobs, we would like to introduce sort-merge based blocking shuffle to Flink[1]. Any feedback is appreciated. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink Best, Yingjie |
Hi Yingjie,
thanks for proposing the sort-merge based blocking shuffle. I like the proposal and it does not seem to change the internals of Flink. Instead it is an extension of existing interfaces which makes it a non-invasive addition. Do you have any numbers comparing the performance of the sort-merge based shuffle against the hash-based shuffle? To what parallelism can you scale up when using the sort-merge based shuffle? Cheers, Till On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <[hidden email]> wrote: > Hi devs, > > Currently, Flink adopts a hash-style blocking shuffle implementation which > writes data sent to different reducer tasks into separate files > concurrently. Compared to sort-merge based approach writes those data > together into a single file and merges those small files into bigger ones, > hash-based approach has several weak points when it comes to running large > scale batch jobs: > > 1. *Stability*: For high parallelism (tens of thousands) batch job, > current hash-based blocking shuffle implementation writes too many files > concurrently which gives high pressure to the file system, for example, > maintenance of too many file metas, exhaustion of inodes or file > descriptors. All of these can be potential stability issues. Sort-Merge > based blocking shuffle don’t have the problem because for one result > partition, only one file is written at the same time. > 2. *Performance*: Large amounts of small shuffle files and random IO can > influence shuffle performance a lot especially for hdd (for ssd, > sequential > read is also important because of read ahead and cache). For batch jobs > processing massive data, small amount of data per subpartition is common > because of high parallelism. Besides, data skew is another cause of > small > subpartition files. By merging data of all subpartitions together in one > file, more sequential read can be achieved. > 3. *Resource*: For current hash-based implementation, each subpartition > needs at least one buffer. For large scale batch shuffles, the memory > consumption can be huge. For example, we need at least 320M network > memory > per result partition if parallelism is set to 10000 and because of the > huge > network consumption, it is hard to config the network memory for large > scale batch job and sometimes parallelism can not be increased just > because of insufficient network memory which leads to bad user > experience. > > To improve Flink’s capability of running large scale batch jobs, we would > like to introduce sort-merge based blocking shuffle to Flink[1]. Any > feedback is appreciated. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > > Best, > Yingjie > |
Hi Till,
Thanks for your reply and comments. You are right, the proposed sort-merge based shuffle is an extension of the existing blocking shuffle and does not change any default behavior of Flink. As for the performance, according to our previous experience, sort-merge based implementation can reduce the shuffle time by 30% to even 90% compared to hash-based implementation. My PoC implementation without any further optimization can already reduce the shuffle time over 10% on SSD and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job. After switch to sort-merge based blocking shuffle, some of our users' jobs can scale up to over 20000 parallelism, though need some JM and RM side optimization. I haven't ever tried to find where the upper bound is, but I guess sever tens of thousand should be able to m <http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W>eet the needs of most users. Best, Yingjie Till Rohrmann <[hidden email]> 于2020年10月15日周四 下午3:57写道: > Hi Yingjie, > > thanks for proposing the sort-merge based blocking shuffle. I like the > proposal and it does not seem to change the internals of Flink. Instead it > is an extension of existing interfaces which makes it a > non-invasive addition. > > Do you have any numbers comparing the performance of the sort-merge based > shuffle against the hash-based shuffle? To what parallelism can you scale > up when using the sort-merge based shuffle? > > Cheers, > Till > > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <[hidden email]> > wrote: > > > Hi devs, > > > > Currently, Flink adopts a hash-style blocking shuffle implementation > which > > writes data sent to different reducer tasks into separate files > > concurrently. Compared to sort-merge based approach writes those data > > together into a single file and merges those small files into bigger > ones, > > hash-based approach has several weak points when it comes to running > large > > scale batch jobs: > > > > 1. *Stability*: For high parallelism (tens of thousands) batch job, > > current hash-based blocking shuffle implementation writes too many > files > > concurrently which gives high pressure to the file system, for > example, > > maintenance of too many file metas, exhaustion of inodes or file > > descriptors. All of these can be potential stability issues. > Sort-Merge > > based blocking shuffle don’t have the problem because for one result > > partition, only one file is written at the same time. > > 2. *Performance*: Large amounts of small shuffle files and random IO > can > > influence shuffle performance a lot especially for hdd (for ssd, > > sequential > > read is also important because of read ahead and cache). For batch > jobs > > processing massive data, small amount of data per subpartition is > common > > because of high parallelism. Besides, data skew is another cause of > > small > > subpartition files. By merging data of all subpartitions together in > one > > file, more sequential read can be achieved. > > 3. *Resource*: For current hash-based implementation, each > subpartition > > needs at least one buffer. For large scale batch shuffles, the memory > > consumption can be huge. For example, we need at least 320M network > > memory > > per result partition if parallelism is set to 10000 and because of the > > huge > > network consumption, it is hard to config the network memory for large > > scale batch job and sometimes parallelism can not be increased just > > because of insufficient network memory which leads to bad user > > experience. > > > > To improve Flink’s capability of running large scale batch jobs, we would > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any > > feedback is appreciated. > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > > > > Best, > > Yingjie > > > |
Thanks for sharing the preliminary numbers with us Yingjie. The numbers
look quite impressive :-) Cheers, Till On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao <[hidden email]> wrote: > Hi Till, > > Thanks for your reply and comments. > > You are right, the proposed sort-merge based shuffle is an extension of the > existing blocking shuffle and does not change any default behavior of > Flink. > > As for the performance, according to our previous experience, sort-merge > based implementation can reduce the shuffle time by 30% to even 90% > compared to hash-based implementation. My PoC implementation without any > further optimization can already reduce the shuffle time over 10% on SSD > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job. > > After switch to sort-merge based blocking shuffle, some of our users' jobs > can scale up to over 20000 parallelism, though need some JM and RM side > optimization. I haven't ever tried to find where the upper bound is, but I > guess sever tens of thousand should be able to m > < > http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W > >eet > the needs of most users. > > Best, > Yingjie > > Till Rohrmann <[hidden email]> 于2020年10月15日周四 下午3:57写道: > > > Hi Yingjie, > > > > thanks for proposing the sort-merge based blocking shuffle. I like the > > proposal and it does not seem to change the internals of Flink. Instead > it > > is an extension of existing interfaces which makes it a > > non-invasive addition. > > > > Do you have any numbers comparing the performance of the sort-merge based > > shuffle against the hash-based shuffle? To what parallelism can you scale > > up when using the sort-merge based shuffle? > > > > Cheers, > > Till > > > > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <[hidden email]> > > wrote: > > > > > Hi devs, > > > > > > Currently, Flink adopts a hash-style blocking shuffle implementation > > which > > > writes data sent to different reducer tasks into separate files > > > concurrently. Compared to sort-merge based approach writes those data > > > together into a single file and merges those small files into bigger > > ones, > > > hash-based approach has several weak points when it comes to running > > large > > > scale batch jobs: > > > > > > 1. *Stability*: For high parallelism (tens of thousands) batch job, > > > current hash-based blocking shuffle implementation writes too many > > files > > > concurrently which gives high pressure to the file system, for > > example, > > > maintenance of too many file metas, exhaustion of inodes or file > > > descriptors. All of these can be potential stability issues. > > Sort-Merge > > > based blocking shuffle don’t have the problem because for one result > > > partition, only one file is written at the same time. > > > 2. *Performance*: Large amounts of small shuffle files and random IO > > can > > > influence shuffle performance a lot especially for hdd (for ssd, > > > sequential > > > read is also important because of read ahead and cache). For batch > > jobs > > > processing massive data, small amount of data per subpartition is > > common > > > because of high parallelism. Besides, data skew is another cause of > > > small > > > subpartition files. By merging data of all subpartitions together in > > one > > > file, more sequential read can be achieved. > > > 3. *Resource*: For current hash-based implementation, each > > subpartition > > > needs at least one buffer. For large scale batch shuffles, the > memory > > > consumption can be huge. For example, we need at least 320M network > > > memory > > > per result partition if parallelism is set to 10000 and because of > the > > > huge > > > network consumption, it is hard to config the network memory for > large > > > scale batch job and sometimes parallelism can not be increased just > > > because of insufficient network memory which leads to bad user > > > experience. > > > > > > To improve Flink’s capability of running large scale batch jobs, we > would > > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any > > > feedback is appreciated. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > > > > > > Best, > > > Yingjie > > > > > > |
Thanks for launching the discussion and the respective FLIP, Yingjie!
In general, I am +1 for this proposal since sort-merge ability has already been taken widely in other batch-based project, like MR, Spark, etc. And it indeed has some performance benefits in some scenarios as mentioned in FLIP. I only have some thoughts for the section of `Public Interfaces` since it cares about how the users understand and better use in practice. As for the new introduced classes, the can be further reviewed in follow up PR since without existing interfaces refactoring ATM. 1. taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition: the default value should be `1` I guess? It is better to give a proper default value that most of users do not need to care about it in practice. 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition: how about making the default for the number of required buffers in LocalBufferPool as now for result partition? Then it is transparent for users to not increase any memory resource no matter with either hash based or sort-merge based way. For the tuned setting , it is better to give some hints to guide users how to adjust it for better performance based on some factors. 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I guess it is not very easy or determined to give a proper value for the switch between hash based and sort-merge based. And how much data a subpartition taking (broadcast) or not suitable for hash based is not completely decided by the number of parallelism somehow. And users might be confused how to tune it in practice. I prefer to giving a simple boolean type option for easy use and the default value can be false in MVP. Then it will not bring any effects for users after upgrade to new version by default, and sort-merge option can be enabled to try out if users willing in desired scenarios. Best, Zhijiang ------------------------------------------------------------------ From:Till Rohrmann <[hidden email]> Send Time:2020年10月16日(星期五) 15:42 To:dev <[hidden email]> Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink Thanks for sharing the preliminary numbers with us Yingjie. The numbers look quite impressive :-) Cheers, Till On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao <[hidden email]> wrote: > Hi Till, > > Thanks for your reply and comments. > > You are right, the proposed sort-merge based shuffle is an extension of the > existing blocking shuffle and does not change any default behavior of > Flink. > > As for the performance, according to our previous experience, sort-merge > based implementation can reduce the shuffle time by 30% to even 90% > compared to hash-based implementation. My PoC implementation without any > further optimization can already reduce the shuffle time over 10% on SSD > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job. > > After switch to sort-merge based blocking shuffle, some of our users' jobs > can scale up to over 20000 parallelism, though need some JM and RM side > optimization. I haven't ever tried to find where the upper bound is, but I > guess sever tens of thousand should be able to m > < > http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W > >eet > the needs of most users. > > Best, > Yingjie > > Till Rohrmann <[hidden email]> 于2020年10月15日周四 下午3:57写道: > > > Hi Yingjie, > > > > thanks for proposing the sort-merge based blocking shuffle. I like the > > proposal and it does not seem to change the internals of Flink. Instead > it > > is an extension of existing interfaces which makes it a > > non-invasive addition. > > > > Do you have any numbers comparing the performance of the sort-merge based > > shuffle against the hash-based shuffle? To what parallelism can you scale > > up when using the sort-merge based shuffle? > > > > Cheers, > > Till > > > > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <[hidden email]> > > wrote: > > > > > Hi devs, > > > > > > Currently, Flink adopts a hash-style blocking shuffle implementation > > which > > > writes data sent to different reducer tasks into separate files > > > concurrently. Compared to sort-merge based approach writes those data > > > together into a single file and merges those small files into bigger > > ones, > > > hash-based approach has several weak points when it comes to running > > large > > > scale batch jobs: > > > > > > 1. *Stability*: For high parallelism (tens of thousands) batch job, > > > current hash-based blocking shuffle implementation writes too many > > files > > > concurrently which gives high pressure to the file system, for > > example, > > > maintenance of too many file metas, exhaustion of inodes or file > > > descriptors. All of these can be potential stability issues. > > Sort-Merge > > > based blocking shuffle don’t have the problem because for one result > > > partition, only one file is written at the same time. > > > 2. *Performance*: Large amounts of small shuffle files and random IO > > can > > > influence shuffle performance a lot especially for hdd (for ssd, > > > sequential > > > read is also important because of read ahead and cache). For batch > > jobs > > > processing massive data, small amount of data per subpartition is > > common > > > because of high parallelism. Besides, data skew is another cause of > > > small > > > subpartition files. By merging data of all subpartitions together in > > one > > > file, more sequential read can be achieved. > > > 3. *Resource*: For current hash-based implementation, each > > subpartition > > > needs at least one buffer. For large scale batch shuffles, the > memory > > > consumption can be huge. For example, we need at least 320M network > > > memory > > > per result partition if parallelism is set to 10000 and because of > the > > > huge > > > network consumption, it is hard to config the network memory for > large > > > scale batch job and sometimes parallelism can not be increased just > > > because of insufficient network memory which leads to bad user > > > experience. > > > > > > To improve Flink’s capability of running large scale batch jobs, we > would > > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any > > > feedback is appreciated. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > > > > > > Best, > > > Yingjie > > > > > > |
Hi Zhijiang,
Thanks for your reply and suggestions. 1. For taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition, we decide to append all data produced by one result partition to one file, so this option will be removed. 2. For taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition, the required buffer of the buffer pool will be min(numSubpartition + 1, this config value), so there it does not increase the number of required buffers but may reduce it when the parallelism is very high. So when user switch to sort-merge implementation, there should be no insufficient network buffers issue. 3. For taskmanager.network.sort-merge-blocking-shuffle.min-parallelism, I agree a bool value is easier to config for user, so we will replace this with a bool switch. We can add this config option back is we have performance concerns in the future. Best, Yingjie Zhijiang <[hidden email]> 于2020年10月19日周一 下午5:27写道: > Thanks for launching the discussion and the respective FLIP, Yingjie! > > In general, I am +1 for this proposal since sort-merge ability has already > been taken widely in other batch-based project, like MR, Spark, etc. > And it indeed has some performance benefits in some scenarios as mentioned > in FLIP. > > I only have some thoughts for the section of `Public Interfaces` since it > cares about how the users understand and better use in practice. > As for the new introduced classes, the can be further reviewed in follow > up PR since without existing interfaces refactoring ATM. > > 1. > taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition: > the default value should be `1` I guess? It is better to give a proper > default value that most of users do not need to > care about it in practice. > > 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition: > how about making the default for the number of required buffers in > LocalBufferPool as now for result partition? > Then it is transparent for users to not increase any memory resource no > matter with either hash based or sort-merge based way. For the tuned > setting , it is better to give some hints to guide > users how to adjust it for better performance based on some factors. > > 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I > guess it is not very easy or determined to give a proper value for the > switch between hash based and sort-merge based. > And how much data a subpartition taking (broadcast) or not suitable for > hash based is not completely decided by the number of parallelism somehow. > And users might be confused how to tune > it in practice. I prefer to giving a simple boolean type option for easy > use and the default value can be false in MVP. Then it will not bring any > effects for users after upgrade to new version by default, > and sort-merge option can be enabled to try out if users willing in > desired scenarios. > > Best, > Zhijiang > ------------------------------------------------------------------ > From:Till Rohrmann <[hidden email]> > Send Time:2020年10月16日(星期五) 15:42 > To:dev <[hidden email]> > Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking > Shuffle to Flink > > Thanks for sharing the preliminary numbers with us Yingjie. The numbers > look quite impressive :-) > > Cheers, > Till > > On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao <[hidden email]> > wrote: > > > Hi Till, > > > > Thanks for your reply and comments. > > > > You are right, the proposed sort-merge based shuffle is an extension of > the > > existing blocking shuffle and does not change any default behavior of > > Flink. > > > > As for the performance, according to our previous experience, sort-merge > > based implementation can reduce the shuffle time by 30% to even 90% > > compared to hash-based implementation. My PoC implementation without any > > further optimization can already reduce the shuffle time over 10% on SSD > > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job. > > > > After switch to sort-merge based blocking shuffle, some of our users' > jobs > > can scale up to over 20000 parallelism, though need some JM and RM side > > optimization. I haven't ever tried to find where the upper bound is, but > I > > guess sever tens of thousand should be able to m > > < > > > http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W > > >eet > > the needs of most users. > > > > Best, > > Yingjie > > > > Till Rohrmann <[hidden email]> 于2020年10月15日周四 下午3:57写道: > > > > > Hi Yingjie, > > > > > > thanks for proposing the sort-merge based blocking shuffle. I like the > > > proposal and it does not seem to change the internals of Flink. Instead > > it > > > is an extension of existing interfaces which makes it a > > > non-invasive addition. > > > > > > Do you have any numbers comparing the performance of the sort-merge > based > > > shuffle against the hash-based shuffle? To what parallelism can you > scale > > > up when using the sort-merge based shuffle? > > > > > > Cheers, > > > Till > > > > > > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <[hidden email]> > > > wrote: > > > > > > > Hi devs, > > > > > > > > Currently, Flink adopts a hash-style blocking shuffle implementation > > > which > > > > writes data sent to different reducer tasks into separate files > > > > concurrently. Compared to sort-merge based approach writes those data > > > > together into a single file and merges those small files into bigger > > > ones, > > > > hash-based approach has several weak points when it comes to running > > > large > > > > scale batch jobs: > > > > > > > > 1. *Stability*: For high parallelism (tens of thousands) batch > job, > > > > current hash-based blocking shuffle implementation writes too many > > > files > > > > concurrently which gives high pressure to the file system, for > > > example, > > > > maintenance of too many file metas, exhaustion of inodes or file > > > > descriptors. All of these can be potential stability issues. > > > Sort-Merge > > > > based blocking shuffle don’t have the problem because for one > result > > > > partition, only one file is written at the same time. > > > > 2. *Performance*: Large amounts of small shuffle files and random > IO > > > can > > > > influence shuffle performance a lot especially for hdd (for ssd, > > > > sequential > > > > read is also important because of read ahead and cache). For batch > > > jobs > > > > processing massive data, small amount of data per subpartition is > > > common > > > > because of high parallelism. Besides, data skew is another cause > of > > > > small > > > > subpartition files. By merging data of all subpartitions together > in > > > one > > > > file, more sequential read can be achieved. > > > > 3. *Resource*: For current hash-based implementation, each > > > subpartition > > > > needs at least one buffer. For large scale batch shuffles, the > > memory > > > > consumption can be huge. For example, we need at least 320M > network > > > > memory > > > > per result partition if parallelism is set to 10000 and because of > > the > > > > huge > > > > network consumption, it is hard to config the network memory for > > large > > > > scale batch job and sometimes parallelism can not be increased > just > > > > because of insufficient network memory which leads to bad user > > > > experience. > > > > > > > > To improve Flink’s capability of running large scale batch jobs, we > > would > > > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any > > > > feedback is appreciated. > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > > > > > > > > Best, > > > > Yingjie > > > > > > > > > > > |
Hi devs,
This discussion thread has been opened for over a week. If there is no other concerns, I'd like to open a voting thread soon. Best, Yingjie Yingjie Cao <[hidden email]> 于2020年10月23日周五 上午11:56写道: > Hi Zhijiang, > > Thanks for your reply and suggestions. > > 1. For > taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition, we > decide to append all data produced by one result partition to one file, so > this option will be removed. > > 2. For > taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition, the > required buffer of the buffer pool will be min(numSubpartition + 1, this > config value), so there it does not increase the number of required buffers > but may reduce it when the parallelism is very high. So when user switch to > sort-merge implementation, there should be no insufficient network buffers > issue. > > 3. For taskmanager.network.sort-merge-blocking-shuffle.min-parallelism, I > agree a bool value is easier to config for user, so we will replace this > with a bool switch. We can add this config option back is we have > performance concerns in the future. > > Best, > Yingjie > > Zhijiang <[hidden email]> 于2020年10月19日周一 下午5:27写道: > >> Thanks for launching the discussion and the respective FLIP, Yingjie! >> >> In general, I am +1 for this proposal since sort-merge ability has >> already been taken widely in other batch-based project, like MR, Spark, etc. >> And it indeed has some performance benefits in some scenarios as >> mentioned in FLIP. >> >> I only have some thoughts for the section of `Public Interfaces` since it >> cares about how the users understand and better use in practice. >> As for the new introduced classes, the can be further reviewed in follow >> up PR since without existing interfaces refactoring ATM. >> >> 1. >> taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition: >> the default value should be `1` I guess? It is better to give a proper >> default value that most of users do not need to >> care about it in practice. >> >> 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition: >> how about making the default for the number of required buffers in >> LocalBufferPool as now for result partition? >> Then it is transparent for users to not increase any memory resource no >> matter with either hash based or sort-merge based way. For the tuned >> setting , it is better to give some hints to guide >> users how to adjust it for better performance based on some factors. >> >> 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I >> guess it is not very easy or determined to give a proper value for the >> switch between hash based and sort-merge based. >> And how much data a subpartition taking (broadcast) or not suitable for >> hash based is not completely decided by the number of parallelism somehow. >> And users might be confused how to tune >> it in practice. I prefer to giving a simple boolean type option for easy >> use and the default value can be false in MVP. Then it will not bring any >> effects for users after upgrade to new version by default, >> and sort-merge option can be enabled to try out if users willing in >> desired scenarios. >> >> Best, >> Zhijiang >> ------------------------------------------------------------------ >> From:Till Rohrmann <[hidden email]> >> Send Time:2020年10月16日(星期五) 15:42 >> To:dev <[hidden email]> >> Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking >> Shuffle to Flink >> >> Thanks for sharing the preliminary numbers with us Yingjie. The numbers >> look quite impressive :-) >> >> Cheers, >> Till >> >> On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao <[hidden email]> >> wrote: >> >> > Hi Till, >> > >> > Thanks for your reply and comments. >> > >> > You are right, the proposed sort-merge based shuffle is an extension of >> the >> > existing blocking shuffle and does not change any default behavior of >> > Flink. >> > >> > As for the performance, according to our previous experience, sort-merge >> > based implementation can reduce the shuffle time by 30% to even 90% >> > compared to hash-based implementation. My PoC implementation without any >> > further optimization can already reduce the shuffle time over 10% on SSD >> > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job. >> > >> > After switch to sort-merge based blocking shuffle, some of our users' >> jobs >> > can scale up to over 20000 parallelism, though need some JM and RM side >> > optimization. I haven't ever tried to find where the upper bound is, >> but I >> > guess sever tens of thousand should be able to m >> > < >> > >> http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W >> > >eet >> > the needs of most users. >> > >> > Best, >> > Yingjie >> > >> > Till Rohrmann <[hidden email]> 于2020年10月15日周四 下午3:57写道: >> > >> > > Hi Yingjie, >> > > >> > > thanks for proposing the sort-merge based blocking shuffle. I like the >> > > proposal and it does not seem to change the internals of Flink. >> Instead >> > it >> > > is an extension of existing interfaces which makes it a >> > > non-invasive addition. >> > > >> > > Do you have any numbers comparing the performance of the sort-merge >> based >> > > shuffle against the hash-based shuffle? To what parallelism can you >> scale >> > > up when using the sort-merge based shuffle? >> > > >> > > Cheers, >> > > Till >> > > >> > > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <[hidden email]> >> > > wrote: >> > > >> > > > Hi devs, >> > > > >> > > > Currently, Flink adopts a hash-style blocking shuffle implementation >> > > which >> > > > writes data sent to different reducer tasks into separate files >> > > > concurrently. Compared to sort-merge based approach writes those >> data >> > > > together into a single file and merges those small files into bigger >> > > ones, >> > > > hash-based approach has several weak points when it comes to running >> > > large >> > > > scale batch jobs: >> > > > >> > > > 1. *Stability*: For high parallelism (tens of thousands) batch >> job, >> > > > current hash-based blocking shuffle implementation writes too >> many >> > > files >> > > > concurrently which gives high pressure to the file system, for >> > > example, >> > > > maintenance of too many file metas, exhaustion of inodes or file >> > > > descriptors. All of these can be potential stability issues. >> > > Sort-Merge >> > > > based blocking shuffle don’t have the problem because for one >> result >> > > > partition, only one file is written at the same time. >> > > > 2. *Performance*: Large amounts of small shuffle files and >> random IO >> > > can >> > > > influence shuffle performance a lot especially for hdd (for ssd, >> > > > sequential >> > > > read is also important because of read ahead and cache). For >> batch >> > > jobs >> > > > processing massive data, small amount of data per subpartition is >> > > common >> > > > because of high parallelism. Besides, data skew is another cause >> of >> > > > small >> > > > subpartition files. By merging data of all subpartitions >> together in >> > > one >> > > > file, more sequential read can be achieved. >> > > > 3. *Resource*: For current hash-based implementation, each >> > > subpartition >> > > > needs at least one buffer. For large scale batch shuffles, the >> > memory >> > > > consumption can be huge. For example, we need at least 320M >> network >> > > > memory >> > > > per result partition if parallelism is set to 10000 and because >> of >> > the >> > > > huge >> > > > network consumption, it is hard to config the network memory for >> > large >> > > > scale batch job and sometimes parallelism can not be increased >> just >> > > > because of insufficient network memory which leads to bad user >> > > > experience. >> > > > >> > > > To improve Flink’s capability of running large scale batch jobs, we >> > would >> > > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any >> > > > feedback is appreciated. >> > > > >> > > > [1] >> > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink >> > > > >> > > > Best, >> > > > Yingjie >> > > > >> > > >> > >> >> |
In reply to this post by yingjie
Hi devs & users,
The FLIP-148[1] has been released with Flink 1.13 and the final implementation has some differences compared with the initial proposal in the FLIP document. To avoid potential misunderstandings, I have updated the FLIP document[1] accordingly and I also drafted another document[2] which contains more implementation details. FYI. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink [2] https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing Best, Yingjie Yingjie Cao <[hidden email]> 于2020年10月15日周四 上午11:02写道: > Hi devs, > > Currently, Flink adopts a hash-style blocking shuffle implementation which > writes data sent to different reducer tasks into separate files > concurrently. Compared to sort-merge based approach writes those data > together into a single file and merges those small files into bigger ones, > hash-based approach has several weak points when it comes to running large > scale batch jobs: > > 1. *Stability*: For high parallelism (tens of thousands) batch job, > current hash-based blocking shuffle implementation writes too many files > concurrently which gives high pressure to the file system, for example, > maintenance of too many file metas, exhaustion of inodes or file > descriptors. All of these can be potential stability issues. Sort-Merge > based blocking shuffle don’t have the problem because for one result > partition, only one file is written at the same time. > 2. *Performance*: Large amounts of small shuffle files and random IO > can influence shuffle performance a lot especially for hdd (for ssd, > sequential read is also important because of read ahead and cache). For > batch jobs processing massive data, small amount of data per subpartition > is common because of high parallelism. Besides, data skew is another cause > of small subpartition files. By merging data of all subpartitions together > in one file, more sequential read can be achieved. > 3. *Resource*: For current hash-based implementation, each > subpartition needs at least one buffer. For large scale batch shuffles, the > memory consumption can be huge. For example, we need at least 320M network > memory per result partition if parallelism is set to 10000 and because of > the huge network consumption, it is hard to config the network memory for > large scale batch job and sometimes parallelism can not be increased just > because of insufficient network memory which leads to bad user experience. > > To improve Flink’s capability of running large scale batch jobs, we would > like to introduce sort-merge based blocking shuffle to Flink[1]. Any > feedback is appreciated. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > > Best, > Yingjie > |
Thanks Yingjie for the great effort!
This is really helpful to Flink Batch users! Best, Jingsong On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <[hidden email]> wrote: > Hi devs & users, > > The FLIP-148[1] has been released with Flink 1.13 and the final > implementation has some differences compared with the initial proposal in > the FLIP document. To avoid potential misunderstandings, I have updated the > FLIP document[1] accordingly and I also drafted another document[2] which > contains more implementation details. FYI. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink > [2] > https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing > > Best, > Yingjie > > Yingjie Cao <[hidden email]> 于2020年10月15日周四 上午11:02写道: > >> Hi devs, >> >> Currently, Flink adopts a hash-style blocking shuffle implementation >> which writes data sent to different reducer tasks into separate files >> concurrently. Compared to sort-merge based approach writes those data >> together into a single file and merges those small files into bigger ones, >> hash-based approach has several weak points when it comes to running large >> scale batch jobs: >> >> 1. *Stability*: For high parallelism (tens of thousands) batch job, >> current hash-based blocking shuffle implementation writes too many files >> concurrently which gives high pressure to the file system, for example, >> maintenance of too many file metas, exhaustion of inodes or file >> descriptors. All of these can be potential stability issues. Sort-Merge >> based blocking shuffle don’t have the problem because for one result >> partition, only one file is written at the same time. >> 2. *Performance*: Large amounts of small shuffle files and random IO >> can influence shuffle performance a lot especially for hdd (for ssd, >> sequential read is also important because of read ahead and cache). For >> batch jobs processing massive data, small amount of data per subpartition >> is common because of high parallelism. Besides, data skew is another cause >> of small subpartition files. By merging data of all subpartitions together >> in one file, more sequential read can be achieved. >> 3. *Resource*: For current hash-based implementation, each >> subpartition needs at least one buffer. For large scale batch shuffles, the >> memory consumption can be huge. For example, we need at least 320M network >> memory per result partition if parallelism is set to 10000 and because of >> the huge network consumption, it is hard to config the network memory for >> large scale batch job and sometimes parallelism can not be increased just >> because of insufficient network memory which leads to bad user experience. >> >> To improve Flink’s capability of running large scale batch jobs, we would >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any >> feedback is appreciated. >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink >> >> Best, >> Yingjie >> > -- Best, Jingsong Lee |
Thanks for the update Yingjie. Would it make sense to write a short blog
post about this feature including some performance improvement numbers? I think this could be interesting to our users. Cheers, Till On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <[hidden email]> wrote: > Thanks Yingjie for the great effort! > > This is really helpful to Flink Batch users! > > Best, > Jingsong > > On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <[hidden email]> > wrote: > > > Hi devs & users, > > > > The FLIP-148[1] has been released with Flink 1.13 and the final > > implementation has some differences compared with the initial proposal in > > the FLIP document. To avoid potential misunderstandings, I have updated > the > > FLIP document[1] accordingly and I also drafted another document[2] which > > contains more implementation details. FYI. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink > > [2] > > > https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing > > > > Best, > > Yingjie > > > > Yingjie Cao <[hidden email]> 于2020年10月15日周四 上午11:02写道: > > > >> Hi devs, > >> > >> Currently, Flink adopts a hash-style blocking shuffle implementation > >> which writes data sent to different reducer tasks into separate files > >> concurrently. Compared to sort-merge based approach writes those data > >> together into a single file and merges those small files into bigger > ones, > >> hash-based approach has several weak points when it comes to running > large > >> scale batch jobs: > >> > >> 1. *Stability*: For high parallelism (tens of thousands) batch job, > >> current hash-based blocking shuffle implementation writes too many > files > >> concurrently which gives high pressure to the file system, for > example, > >> maintenance of too many file metas, exhaustion of inodes or file > >> descriptors. All of these can be potential stability issues. > Sort-Merge > >> based blocking shuffle don’t have the problem because for one result > >> partition, only one file is written at the same time. > >> 2. *Performance*: Large amounts of small shuffle files and random IO > >> can influence shuffle performance a lot especially for hdd (for ssd, > >> sequential read is also important because of read ahead and cache). > For > >> batch jobs processing massive data, small amount of data per > subpartition > >> is common because of high parallelism. Besides, data skew is another > cause > >> of small subpartition files. By merging data of all subpartitions > together > >> in one file, more sequential read can be achieved. > >> 3. *Resource*: For current hash-based implementation, each > >> subpartition needs at least one buffer. For large scale batch > shuffles, the > >> memory consumption can be huge. For example, we need at least 320M > network > >> memory per result partition if parallelism is set to 10000 and > because of > >> the huge network consumption, it is hard to config the network > memory for > >> large scale batch job and sometimes parallelism can not be > increased just > >> because of insufficient network memory which leads to bad user > experience. > >> > >> To improve Flink’s capability of running large scale batch jobs, we > would > >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any > >> feedback is appreciated. > >> > >> [1] > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > >> > >> Best, > >> Yingjie > >> > > > > -- > Best, Jingsong Lee > |
Hi Till,
Thanks for the suggestion. The blog post is already on the way. Best, Yingjie Till Rohrmann <[hidden email]> 于2021年6月8日周二 下午5:30写道: > Thanks for the update Yingjie. Would it make sense to write a short blog > post about this feature including some performance improvement numbers? I > think this could be interesting to our users. > > Cheers, > Till > > On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <[hidden email]> wrote: > >> Thanks Yingjie for the great effort! >> >> This is really helpful to Flink Batch users! >> >> Best, >> Jingsong >> >> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <[hidden email]> >> wrote: >> >> > Hi devs & users, >> > >> > The FLIP-148[1] has been released with Flink 1.13 and the final >> > implementation has some differences compared with the initial proposal >> in >> > the FLIP document. To avoid potential misunderstandings, I have updated >> the >> > FLIP document[1] accordingly and I also drafted another document[2] >> which >> > contains more implementation details. FYI. >> > >> > [1] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink >> > [2] >> > >> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing >> > >> > Best, >> > Yingjie >> > >> > Yingjie Cao <[hidden email]> 于2020年10月15日周四 上午11:02写道: >> > >> >> Hi devs, >> >> >> >> Currently, Flink adopts a hash-style blocking shuffle implementation >> >> which writes data sent to different reducer tasks into separate files >> >> concurrently. Compared to sort-merge based approach writes those data >> >> together into a single file and merges those small files into bigger >> ones, >> >> hash-based approach has several weak points when it comes to running >> large >> >> scale batch jobs: >> >> >> >> 1. *Stability*: For high parallelism (tens of thousands) batch job, >> >> current hash-based blocking shuffle implementation writes too many >> files >> >> concurrently which gives high pressure to the file system, for >> example, >> >> maintenance of too many file metas, exhaustion of inodes or file >> >> descriptors. All of these can be potential stability issues. >> Sort-Merge >> >> based blocking shuffle don’t have the problem because for one result >> >> partition, only one file is written at the same time. >> >> 2. *Performance*: Large amounts of small shuffle files and random IO >> >> can influence shuffle performance a lot especially for hdd (for ssd, >> >> sequential read is also important because of read ahead and cache). >> For >> >> batch jobs processing massive data, small amount of data per >> subpartition >> >> is common because of high parallelism. Besides, data skew is >> another cause >> >> of small subpartition files. By merging data of all subpartitions >> together >> >> in one file, more sequential read can be achieved. >> >> 3. *Resource*: For current hash-based implementation, each >> >> subpartition needs at least one buffer. For large scale batch >> shuffles, the >> >> memory consumption can be huge. For example, we need at least 320M >> network >> >> memory per result partition if parallelism is set to 10000 and >> because of >> >> the huge network consumption, it is hard to config the network >> memory for >> >> large scale batch job and sometimes parallelism can not be >> increased just >> >> because of insufficient network memory which leads to bad user >> experience. >> >> >> >> To improve Flink’s capability of running large scale batch jobs, we >> would >> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any >> >> feedback is appreciated. >> >> >> >> [1] >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink >> >> >> >> Best, >> >> Yingjie >> >> >> > >> >> -- >> Best, Jingsong Lee >> > |
Great :-)
On Tue, Jun 8, 2021 at 1:11 PM Yingjie Cao <[hidden email]> wrote: > Hi Till, > > Thanks for the suggestion. The blog post is already on the way. > > Best, > Yingjie > > Till Rohrmann <[hidden email]> 于2021年6月8日周二 下午5:30写道: > >> Thanks for the update Yingjie. Would it make sense to write a short blog >> post about this feature including some performance improvement numbers? I >> think this could be interesting to our users. >> >> Cheers, >> Till >> >> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <[hidden email]> >> wrote: >> >>> Thanks Yingjie for the great effort! >>> >>> This is really helpful to Flink Batch users! >>> >>> Best, >>> Jingsong >>> >>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <[hidden email]> >>> wrote: >>> >>> > Hi devs & users, >>> > >>> > The FLIP-148[1] has been released with Flink 1.13 and the final >>> > implementation has some differences compared with the initial proposal >>> in >>> > the FLIP document. To avoid potential misunderstandings, I have >>> updated the >>> > FLIP document[1] accordingly and I also drafted another document[2] >>> which >>> > contains more implementation details. FYI. >>> > >>> > [1] >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink >>> > [2] >>> > >>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing >>> > >>> > Best, >>> > Yingjie >>> > >>> > Yingjie Cao <[hidden email]> 于2020年10月15日周四 上午11:02写道: >>> > >>> >> Hi devs, >>> >> >>> >> Currently, Flink adopts a hash-style blocking shuffle implementation >>> >> which writes data sent to different reducer tasks into separate files >>> >> concurrently. Compared to sort-merge based approach writes those data >>> >> together into a single file and merges those small files into bigger >>> ones, >>> >> hash-based approach has several weak points when it comes to running >>> large >>> >> scale batch jobs: >>> >> >>> >> 1. *Stability*: For high parallelism (tens of thousands) batch job, >>> >> current hash-based blocking shuffle implementation writes too many >>> files >>> >> concurrently which gives high pressure to the file system, for >>> example, >>> >> maintenance of too many file metas, exhaustion of inodes or file >>> >> descriptors. All of these can be potential stability issues. >>> Sort-Merge >>> >> based blocking shuffle don’t have the problem because for one >>> result >>> >> partition, only one file is written at the same time. >>> >> 2. *Performance*: Large amounts of small shuffle files and random >>> IO >>> >> can influence shuffle performance a lot especially for hdd (for >>> ssd, >>> >> sequential read is also important because of read ahead and >>> cache). For >>> >> batch jobs processing massive data, small amount of data per >>> subpartition >>> >> is common because of high parallelism. Besides, data skew is >>> another cause >>> >> of small subpartition files. By merging data of all subpartitions >>> together >>> >> in one file, more sequential read can be achieved. >>> >> 3. *Resource*: For current hash-based implementation, each >>> >> subpartition needs at least one buffer. For large scale batch >>> shuffles, the >>> >> memory consumption can be huge. For example, we need at least 320M >>> network >>> >> memory per result partition if parallelism is set to 10000 and >>> because of >>> >> the huge network consumption, it is hard to config the network >>> memory for >>> >> large scale batch job and sometimes parallelism can not be >>> increased just >>> >> because of insufficient network memory which leads to bad user >>> experience. >>> >> >>> >> To improve Flink’s capability of running large scale batch jobs, we >>> would >>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any >>> >> feedback is appreciated. >>> >> >>> >> [1] >>> >> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink >>> >> >>> >> Best, >>> >> Yingjie >>> >> >>> > >>> >>> -- >>> Best, Jingsong Lee >>> >> |
Free forum by Nabble | Edit this page |