Posted by
yingjie on
URL: http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/DISCUSS-FLIP-148-Introduce-Sort-Merge-Based-Blocking-Shuffle-to-Flink-tp45717.html
Hi devs,
Currently, Flink adopts a hash-style blocking shuffle implementation which
writes data sent to different reducer tasks into separate files
concurrently. Compared to sort-merge based approach writes those data
together into a single file and merges those small files into bigger ones,
hash-based approach has several weak points when it comes to running large
scale batch jobs:
1. *Stability*: For high parallelism (tens of thousands) batch job,
current hash-based blocking shuffle implementation writes too many files
concurrently which gives high pressure to the file system, for example,
maintenance of too many file metas, exhaustion of inodes or file
descriptors. All of these can be potential stability issues. Sort-Merge
based blocking shuffle don’t have the problem because for one result
partition, only one file is written at the same time.
2. *Performance*: Large amounts of small shuffle files and random IO can
influence shuffle performance a lot especially for hdd (for ssd, sequential
read is also important because of read ahead and cache). For batch jobs
processing massive data, small amount of data per subpartition is common
because of high parallelism. Besides, data skew is another cause of small
subpartition files. By merging data of all subpartitions together in one
file, more sequential read can be achieved.
3. *Resource*: For current hash-based implementation, each subpartition
needs at least one buffer. For large scale batch shuffles, the memory
consumption can be huge. For example, we need at least 320M network memory
per result partition if parallelism is set to 10000 and because of the huge
network consumption, it is hard to config the network memory for large
scale batch job and sometimes parallelism can not be increased just
because of insufficient network memory which leads to bad user experience.
To improve Flink’s capability of running large scale batch jobs, we would
like to introduce sort-merge based blocking shuffle to Flink[1]. Any
feedback is appreciated.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+FlinkBest,
Yingjie