Hi,
In Hadoop MapReduce there is the notion of "splittable" in the FileInputFormat. This has the effect that a single input file can be fed into multiple separate instances of the mapper that read the data. A lot has been documented (i.e. text is splittable per line, gzipped text is not splittable) and designed into the various file formats (like Avro and Parquet) to allow splittability. The goal is that reading and parsing files can be done by multiple cpus/systems in parallel. How is this handled in Flink? Can Flink read a single file in parallel? How does Flink administrate/handle the possibilities regarding the various file formats? The reason I ask is because I want to see if I can port this (now Hadoop specific) hobby project of mine to work with Flink: https://github.com/nielsbasjes/splittablegzip Thanks. -- Best regards / Met vriendelijke groeten, Niels Basjes |
AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore you can set for custom Fileibputformats the attribute unsplittable = true if your file format cannot be split
> On 18. Feb 2018, at 13:28, Niels Basjes <[hidden email]> wrote: > > Hi, > > In Hadoop MapReduce there is the notion of "splittable" in the > FileInputFormat. This has the effect that a single input file can be fed > into multiple separate instances of the mapper that read the data. > A lot has been documented (i.e. text is splittable per line, gzipped text > is not splittable) and designed into the various file formats (like Avro > and Parquet) to allow splittability. > > The goal is that reading and parsing files can be done by multiple > cpus/systems in parallel. > > How is this handled in Flink? > Can Flink read a single file in parallel? > How does Flink administrate/handle the possibilities regarding the various > file formats? > > > The reason I ask is because I want to see if I can port this (now Hadoop > specific) hobby project of mine to work with Flink: > https://github.com/nielsbasjes/splittablegzip > > Thanks. > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes |
Hi Niels,
Jörn is right, although offering different methods, Flink's InputFormat is very similar to Hadoop's InputFormat interface. The InputFormat.createInputSplits() method generates splits that can be read in parallel. The FileInputFormat splits files by fixed boundaries (usually HDFS blocksize) and expects the InputFormat to find the right place to start and end. For line-wise read files (TextInputFormat) or files with a record delimiter (DelimiterInputFormat), the formats read the first record after they found the first delimiter in their split and stop at the first delimiter after the split boundary. The BinaryInputFormat extends FileInputFormat but overrides the createInputSplits method. So, how exactly a file is read in parallel depends on the createInputSplits() method of the InputFormat. Hope this helps, Fabian 2018-02-18 13:36 GMT+01:00 Jörn Franke <[hidden email]>: > AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore you > can set for custom Fileibputformats the attribute unsplittable = true if > your file format cannot be split > > > On 18. Feb 2018, at 13:28, Niels Basjes <[hidden email]> wrote: > > > > Hi, > > > > In Hadoop MapReduce there is the notion of "splittable" in the > > FileInputFormat. This has the effect that a single input file can be fed > > into multiple separate instances of the mapper that read the data. > > A lot has been documented (i.e. text is splittable per line, gzipped text > > is not splittable) and designed into the various file formats (like Avro > > and Parquet) to allow splittability. > > > > The goal is that reading and parsing files can be done by multiple > > cpus/systems in parallel. > > > > How is this handled in Flink? > > Can Flink read a single file in parallel? > > How does Flink administrate/handle the possibilities regarding the > various > > file formats? > > > > > > The reason I ask is because I want to see if I can port this (now Hadoop > > specific) hobby project of mine to work with Flink: > > https://github.com/nielsbasjes/splittablegzip > > > > Thanks. > > > > -- > > Best regards / Met vriendelijke groeten, > > > > Niels Basjes > |
Hi,
I wanted to spend some time on this idea I asked about a year ago. So I had a look at the actual code and found this where the file splits are calculated https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L601 What I realized is that apparently the current system for determining the FileInputSplit s 1) Lists the collection of input files. 2) If a file is NOT splittable set a global flag to false. 3) ONLY if all of them are splittable then will they be split. So (if I understand correctly) if I have an input directory with a 100GB uncompressed text file and a 1 KB gzipped file then BOTH will NOT be split. Why is that done this way? I also found that the determination if a file is NOT splittable completely rests on the fact that the decompression that is to be used is to be provided by an InflaterInputStreamFactory. Which is funny because the BZIP2 as a compression IS splittable but according to this implementation it isn't. I also noticed that the Hadoop split calculation is controlled by a split size, the Flink implementation is controlled by the number of splits. This seems logical to me as in Hadoop (MapReduce) the number of tasks running is dynamic. In contrast with Flink where the number of parallel tasks is a given setting. Looking at how Hadoop does this I see that the FileInputFormat has a method isSplitable (yes with typo, and with a terribly dangerous default value: MAPREDUCE-2094) which gives the answer on a per file basis. Some file formats (like TextInputFormat) look at the codec that was found to see if it implements the SplittableCompressionCodec interface to determine if the file is splittable. Other file formats (like Avro and Parquet) use something else to determine this. Overall I currently think the way this has been implemented in Flink is not very good. However looking at the gap to bridge to make it similar to what Hadoop has seems like a huge step. I expect that many classes and interfaces will need to change dramatically to make this happen. My main question to you guys: Do you think it is worth it? Niels Basjes On Mon, Feb 19, 2018 at 10:50 AM Fabian Hueske <[hidden email]> wrote: > Hi Niels, > > Jörn is right, although offering different methods, Flink's InputFormat is > very similar to Hadoop's InputFormat interface. > The InputFormat.createInputSplits() method generates splits that can be > read in parallel. > The FileInputFormat splits files by fixed boundaries (usually HDFS > blocksize) and expects the InputFormat to find the right place to start and > end. > For line-wise read files (TextInputFormat) or files with a record delimiter > (DelimiterInputFormat), the formats read the first record after they found > the first delimiter in their split and stop at the first delimiter after > the split boundary. > The BinaryInputFormat extends FileInputFormat but overrides the > createInputSplits method. > > So, how exactly a file is read in parallel depends on the > createInputSplits() method of the InputFormat. > > Hope this helps, > Fabian > > > 2018-02-18 13:36 GMT+01:00 Jörn Franke <[hidden email]>: > > > AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore you > > can set for custom Fileibputformats the attribute unsplittable = true if > > your file format cannot be split > > > > > On 18. Feb 2018, at 13:28, Niels Basjes <[hidden email]> wrote: > > > > > > Hi, > > > > > > In Hadoop MapReduce there is the notion of "splittable" in the > > > FileInputFormat. This has the effect that a single input file can be > fed > > > into multiple separate instances of the mapper that read the data. > > > A lot has been documented (i.e. text is splittable per line, gzipped > text > > > is not splittable) and designed into the various file formats (like > Avro > > > and Parquet) to allow splittability. > > > > > > The goal is that reading and parsing files can be done by multiple > > > cpus/systems in parallel. > > > > > > How is this handled in Flink? > > > Can Flink read a single file in parallel? > > > How does Flink administrate/handle the possibilities regarding the > > various > > > file formats? > > > > > > > > > The reason I ask is because I want to see if I can port this (now > Hadoop > > > specific) hobby project of mine to work with Flink: > > > https://github.com/nielsbasjes/splittablegzip > > > > > > Thanks. > > > > > > -- > > > Best regards / Met vriendelijke groeten, > > > > > > Niels Basjes > > > -- Best regards / Met vriendelijke groeten, Niels Basjes |
Hi Niels,
Thanks for start this discussion. Share some thought about your questions/proposals. > Judge whether splittable for each individual file Looks good to me. > BZIP2 support splittable Looks good to me. > the Flink implementation is controlled by the number of splits Can you check again? I think Flink is also affected by block size [1]. > Looking at how Hadoop does this I see that the FileInputFormat has a method isSplitable Now Flink do this in FileInputFormat, but also can do this like Hadoop I can see the split strategy in Hadoop orc and parquet are quite complex, I don't have a lot of in-depth research, but I think these implementations should be meaningful. [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L644 Best, Jingsong Lee On Thu, Apr 23, 2020 at 4:13 PM Niels Basjes <[hidden email]> wrote: > Hi, > > I wanted to spend some time on this idea I asked about a year ago. > > So I had a look at the actual code and found this where the file splits are > calculated > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L601 > > What I realized is that apparently the current system for determining the > FileInputSplit s > 1) Lists the collection of input files. > 2) If a file is NOT splittable set a global flag to false. > 3) ONLY if all of them are splittable then will they be split. > > So (if I understand correctly) if I have an input directory with a 100GB > uncompressed text file and a 1 KB gzipped file then BOTH will NOT be split. > Why is that done this way? > > I also found that the determination if a file is NOT splittable completely > rests on the fact that the decompression that is to be used is to be > provided by an InflaterInputStreamFactory. > Which is funny because the BZIP2 as a compression IS splittable but > according to this implementation it isn't. > > I also noticed that the Hadoop split calculation is controlled by a split > size, the Flink implementation is controlled by the number of splits. > This seems logical to me as in Hadoop (MapReduce) the number of tasks > running is dynamic. In contrast with Flink where the number of parallel > tasks is a given setting. > > Looking at how Hadoop does this I see that the FileInputFormat has a > method isSplitable > (yes with typo, and with a terribly dangerous default value: > MAPREDUCE-2094) > which gives the answer on a per file basis. > Some file formats (like TextInputFormat) look at the codec that was found > to see if it implements the SplittableCompressionCodec interface to > determine if the file is splittable. > Other file formats (like Avro and Parquet) use something else to determine > this. > > Overall I currently think the way this has been implemented in Flink is not > very good. > > However looking at the gap to bridge to make it similar to what Hadoop has > seems like a huge step. > > I expect that many classes and interfaces will need to change dramatically > to make this happen. > > My main question to you guys: Do you think it is worth it? > > Niels Basjes > > > On Mon, Feb 19, 2018 at 10:50 AM Fabian Hueske <[hidden email]> wrote: > > > Hi Niels, > > > > Jörn is right, although offering different methods, Flink's InputFormat > is > > very similar to Hadoop's InputFormat interface. > > The InputFormat.createInputSplits() method generates splits that can be > > read in parallel. > > The FileInputFormat splits files by fixed boundaries (usually HDFS > > blocksize) and expects the InputFormat to find the right place to start > and > > end. > > For line-wise read files (TextInputFormat) or files with a record > delimiter > > (DelimiterInputFormat), the formats read the first record after they > found > > the first delimiter in their split and stop at the first delimiter after > > the split boundary. > > The BinaryInputFormat extends FileInputFormat but overrides the > > createInputSplits method. > > > > So, how exactly a file is read in parallel depends on the > > createInputSplits() method of the InputFormat. > > > > Hope this helps, > > Fabian > > > > > > 2018-02-18 13:36 GMT+01:00 Jörn Franke <[hidden email]>: > > > > > AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore > you > > > can set for custom Fileibputformats the attribute unsplittable = true > if > > > your file format cannot be split > > > > > > > On 18. Feb 2018, at 13:28, Niels Basjes <[hidden email]> wrote: > > > > > > > > Hi, > > > > > > > > In Hadoop MapReduce there is the notion of "splittable" in the > > > > FileInputFormat. This has the effect that a single input file can be > > fed > > > > into multiple separate instances of the mapper that read the data. > > > > A lot has been documented (i.e. text is splittable per line, gzipped > > text > > > > is not splittable) and designed into the various file formats (like > > Avro > > > > and Parquet) to allow splittability. > > > > > > > > The goal is that reading and parsing files can be done by multiple > > > > cpus/systems in parallel. > > > > > > > > How is this handled in Flink? > > > > Can Flink read a single file in parallel? > > > > How does Flink administrate/handle the possibilities regarding the > > > various > > > > file formats? > > > > > > > > > > > > The reason I ask is because I want to see if I can port this (now > > Hadoop > > > > specific) hobby project of mine to work with Flink: > > > > https://github.com/nielsbasjes/splittablegzip > > > > > > > > Thanks. > > > > > > > > -- > > > > Best regards / Met vriendelijke groeten, > > > > > > > > Niels Basjes > > > > > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > -- Best, Jingsong Lee |
Hi,
Yes, you are correct. Flink does use a minSplitSize to determine the splits. I missed that part. Also this is the part I do not intend to change. In this I would focus on essentially: - "is splittable" is decided per file --> Similar to Hadoop: Both a codec and fileformat get a "isSplittable" method/marker that let's this to be decided per file at runtime. This should be the 'same' construct for all file formats (so also for Text, Avro, Parquet, etc.). Also if I create a new file format that for something that does not support splitting (like some specific datastructures using for example XML) then that should be cleanly possible. - The codecs must be overridable. For this https://github.com/nielsbasjes/splittablegzip to work the default gzip decompressor must be disabled. The current setup seems to make this possible but I'm not sure: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L118 I intend to leave the way the splits are calculated to be as-is. Niels On Thu, Apr 23, 2020 at 11:33 AM Jingsong Li <[hidden email]> wrote: > Hi Niels, > > Thanks for start this discussion. Share some thought about your > questions/proposals. > > > Judge whether splittable for each individual file > Looks good to me. > > > BZIP2 support splittable > Looks good to me. > > > the Flink implementation is controlled by the number of splits > Can you check again? I think Flink is also affected by block size [1]. > > > Looking at how Hadoop does this I see that the FileInputFormat has a > method isSplitable > Now Flink do this in FileInputFormat, but also can do this like Hadoop > > I can see the split strategy in Hadoop orc and parquet are quite complex, I > don't have a lot of in-depth research, but I think these implementations > should be meaningful. > > [1] > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L644 > > Best, > Jingsong Lee > > On Thu, Apr 23, 2020 at 4:13 PM Niels Basjes <[hidden email]> wrote: > > > Hi, > > > > I wanted to spend some time on this idea I asked about a year ago. > > > > So I had a look at the actual code and found this where the file splits > are > > calculated > > > > > > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L601 > > > > What I realized is that apparently the current system for determining the > > FileInputSplit s > > 1) Lists the collection of input files. > > 2) If a file is NOT splittable set a global flag to false. > > 3) ONLY if all of them are splittable then will they be split. > > > > So (if I understand correctly) if I have an input directory with a 100GB > > uncompressed text file and a 1 KB gzipped file then BOTH will NOT be > split. > > Why is that done this way? > > > > I also found that the determination if a file is NOT splittable > completely > > rests on the fact that the decompression that is to be used is to be > > provided by an InflaterInputStreamFactory. > > Which is funny because the BZIP2 as a compression IS splittable but > > according to this implementation it isn't. > > > > I also noticed that the Hadoop split calculation is controlled by a split > > size, the Flink implementation is controlled by the number of splits. > > This seems logical to me as in Hadoop (MapReduce) the number of tasks > > running is dynamic. In contrast with Flink where the number of parallel > > tasks is a given setting. > > > > Looking at how Hadoop does this I see that the FileInputFormat has a > > method isSplitable > > (yes with typo, and with a terribly dangerous default value: > > MAPREDUCE-2094) > > which gives the answer on a per file basis. > > Some file formats (like TextInputFormat) look at the codec that was found > > to see if it implements the SplittableCompressionCodec interface to > > determine if the file is splittable. > > Other file formats (like Avro and Parquet) use something else to > determine > > this. > > > > Overall I currently think the way this has been implemented in Flink is > not > > very good. > > > > However looking at the gap to bridge to make it similar to what Hadoop > has > > seems like a huge step. > > > > I expect that many classes and interfaces will need to change > dramatically > > to make this happen. > > > > My main question to you guys: Do you think it is worth it? > > > > Niels Basjes > > > > > > On Mon, Feb 19, 2018 at 10:50 AM Fabian Hueske <[hidden email]> > wrote: > > > > > Hi Niels, > > > > > > Jörn is right, although offering different methods, Flink's InputFormat > > is > > > very similar to Hadoop's InputFormat interface. > > > The InputFormat.createInputSplits() method generates splits that can be > > > read in parallel. > > > The FileInputFormat splits files by fixed boundaries (usually HDFS > > > blocksize) and expects the InputFormat to find the right place to start > > and > > > end. > > > For line-wise read files (TextInputFormat) or files with a record > > delimiter > > > (DelimiterInputFormat), the formats read the first record after they > > found > > > the first delimiter in their split and stop at the first delimiter > after > > > the split boundary. > > > The BinaryInputFormat extends FileInputFormat but overrides the > > > createInputSplits method. > > > > > > So, how exactly a file is read in parallel depends on the > > > createInputSplits() method of the InputFormat. > > > > > > Hope this helps, > > > Fabian > > > > > > > > > 2018-02-18 13:36 GMT+01:00 Jörn Franke <[hidden email]>: > > > > > > > AFAIK Flink has a similar notion of splittable as Hadoop. Furthermore > > you > > > > can set for custom Fileibputformats the attribute unsplittable = true > > if > > > > your file format cannot be split > > > > > > > > > On 18. Feb 2018, at 13:28, Niels Basjes <[hidden email]> wrote: > > > > > > > > > > Hi, > > > > > > > > > > In Hadoop MapReduce there is the notion of "splittable" in the > > > > > FileInputFormat. This has the effect that a single input file can > be > > > fed > > > > > into multiple separate instances of the mapper that read the data. > > > > > A lot has been documented (i.e. text is splittable per line, > gzipped > > > text > > > > > is not splittable) and designed into the various file formats (like > > > Avro > > > > > and Parquet) to allow splittability. > > > > > > > > > > The goal is that reading and parsing files can be done by multiple > > > > > cpus/systems in parallel. > > > > > > > > > > How is this handled in Flink? > > > > > Can Flink read a single file in parallel? > > > > > How does Flink administrate/handle the possibilities regarding the > > > > various > > > > > file formats? > > > > > > > > > > > > > > > The reason I ask is because I want to see if I can port this (now > > > Hadoop > > > > > specific) hobby project of mine to work with Flink: > > > > > https://github.com/nielsbasjes/splittablegzip > > > > > > > > > > Thanks. > > > > > > > > > > -- > > > > > Best regards / Met vriendelijke groeten, > > > > > > > > > > Niels Basjes > > > > > > > > > > > > > -- > > Best regards / Met vriendelijke groeten, > > > > Niels Basjes > > > > > -- > Best, Jingsong Lee > -- Best regards / Met vriendelijke groeten, Niels Basjes |
Free forum by Nabble | Edit this page |