http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/DISCUSS-FLIP-148-Introduce-Sort-Merge-Based-Blocking-Shuffle-to-Flink-tp45717p45742.html
compared to hash-based implementation. My PoC implementation without any
optimization. I haven't ever tried to find where the upper bound is, but I
> Hi Yingjie,
>
> thanks for proposing the sort-merge based blocking shuffle. I like the
> proposal and it does not seem to change the internals of Flink. Instead it
> is an extension of existing interfaces which makes it a
> non-invasive addition.
>
> Do you have any numbers comparing the performance of the sort-merge based
> shuffle against the hash-based shuffle? To what parallelism can you scale
> up when using the sort-merge based shuffle?
>
> Cheers,
> Till
>
> On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <
[hidden email]>
> wrote:
>
> > Hi devs,
> >
> > Currently, Flink adopts a hash-style blocking shuffle implementation
> which
> > writes data sent to different reducer tasks into separate files
> > concurrently. Compared to sort-merge based approach writes those data
> > together into a single file and merges those small files into bigger
> ones,
> > hash-based approach has several weak points when it comes to running
> large
> > scale batch jobs:
> >
> > 1. *Stability*: For high parallelism (tens of thousands) batch job,
> > current hash-based blocking shuffle implementation writes too many
> files
> > concurrently which gives high pressure to the file system, for
> example,
> > maintenance of too many file metas, exhaustion of inodes or file
> > descriptors. All of these can be potential stability issues.
> Sort-Merge
> > based blocking shuffle don’t have the problem because for one result
> > partition, only one file is written at the same time.
> > 2. *Performance*: Large amounts of small shuffle files and random IO
> can
> > influence shuffle performance a lot especially for hdd (for ssd,
> > sequential
> > read is also important because of read ahead and cache). For batch
> jobs
> > processing massive data, small amount of data per subpartition is
> common
> > because of high parallelism. Besides, data skew is another cause of
> > small
> > subpartition files. By merging data of all subpartitions together in
> one
> > file, more sequential read can be achieved.
> > 3. *Resource*: For current hash-based implementation, each
> subpartition
> > needs at least one buffer. For large scale batch shuffles, the memory
> > consumption can be huge. For example, we need at least 320M network
> > memory
> > per result partition if parallelism is set to 10000 and because of the
> > huge
> > network consumption, it is hard to config the network memory for large
> > scale batch job and sometimes parallelism can not be increased just
> > because of insufficient network memory which leads to bad user
> > experience.
> >
> > To improve Flink’s capability of running large scale batch jobs, we would
> > like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> > feedback is appreciated.
> >
> > [1]
> >
> >
>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink> >
> > Best,
> > Yingjie
> >
>