Hi Stephan (switching to dev list),
> On Aug 29, 2019, at 2:52 AM, Stephan Ewen <[hidden email]> wrote: > > That is a good point. > > Which way would you suggest to go? Not relying on the FS block size at all, but using a fix (configurable) block size? There’s value to not requiring a fixed block size, as then a file that’s moved between different file systems can be read using whatever block size is optimal for that system. Hadoop handles this in sequence files by storing a unique “sync marker” value in the file header (essentially a 16 byte UUID), injecting one of these every 2K bytes or so (in between records), and then code can scan for this to find record boundaries without relying on a block size. The idea is that 2^128 is a Big Number, so the odds of finding a false-positive sync marker in data is low enough to be ignorable. But that’s a bigger change. Simpler would be to put a header in each part file being written, with some signature bytes to aid in detecting old-format files. Or maybe deprecate SerializedOutputFormat/SerializedInputFormat, and provide some wrapper glue to make it easier to write/read Hadoop SequenceFiles that have a null key value, and store the POJO as the data value. Then you could also leverage Hadoop support for compression at either record or block level. — Ken > > On Thu, Aug 29, 2019 at 4:49 AM Ken Krugler <[hidden email] <mailto:[hidden email]>> wrote: > Hi all, > > Wondering if anyone else has run into this. > > We write files to S3 using the SerializedOutputFormat<OurCustomPOJO>. When we read them back, sometimes we get deserialization errors where the data seems to be corrupt. > > After a lot of logging, the weathervane of blame pointed towards the block size somehow not being the same between the write (where it’s 64MB) and the read (unknown). > > When I added a call to SerializedInputFormat.setBlockSize(64MB), the problems went away. > > It looks like both input and output formats use fs.getDefaultBlockSize() to set this value by default, so maybe the root issue is S3 somehow reporting different values. > > But it does feel a bit odd that we’re relying on this default setting, versus it being recorded in the file during the write phase. > > And it’s awkward to try to set the block size on the write, as you need to set it in the environment conf, which means it applies to all output files in the job. > > — Ken -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Sounds reasonable.
I am adding Arvid to the thread - IIRC he authored that tool in his Stratosphere days. And my a stroke of luck, he is now working on Flink again. @Arvid - what are your thoughts on Ken's suggestions? On Fri, Aug 30, 2019 at 8:56 PM Ken Krugler <[hidden email]> wrote: > Hi Stephan (switching to dev list), > > On Aug 29, 2019, at 2:52 AM, Stephan Ewen <[hidden email]> wrote: > > That is a good point. > > Which way would you suggest to go? Not relying on the FS block size at > all, but using a fix (configurable) block size? > > > There’s value to not requiring a fixed block size, as then a file that’s > moved between different file systems can be read using whatever block size > is optimal for that system. > > Hadoop handles this in sequence files by storing a unique “sync marker” > value in the file header (essentially a 16 byte UUID), injecting one of > these every 2K bytes or so (in between records), and then code can scan for > this to find record boundaries without relying on a block size. The idea is > that 2^128 is a Big Number, so the odds of finding a false-positive sync > marker in data is low enough to be ignorable. > > But that’s a bigger change. Simpler would be to put a header in each part > file being written, with some signature bytes to aid in detecting > old-format files. > > Or maybe deprecate SerializedOutputFormat/SerializedInputFormat, and > provide some wrapper glue to make it easier to write/read Hadoop > SequenceFiles that have a null key value, and store the POJO as the data > value. Then you could also leverage Hadoop support for compression at > either record or block level. > > — Ken > > > On Thu, Aug 29, 2019 at 4:49 AM Ken Krugler <[hidden email]> > wrote: > >> Hi all, >> >> Wondering if anyone else has run into this. >> >> We write files to S3 using the SerializedOutputFormat<OurCustomPOJO>. >> When we read them back, sometimes we get deserialization errors where the >> data seems to be corrupt. >> >> After a lot of logging, the weathervane of blame pointed towards the >> block size somehow not being the same between the write (where it’s 64MB) >> and the read (unknown). >> >> When I added a call to SerializedInputFormat.setBlockSize(64MB), the >> problems went away. >> >> It looks like both input and output formats use fs.getDefaultBlockSize() >> to set this value by default, so maybe the root issue is S3 somehow >> reporting different values. >> >> But it does feel a bit odd that we’re relying on this default setting, >> versus it being recorded in the file during the write phase. >> >> And it’s awkward to try to set the block size on the write, as you need >> to set it in the environment conf, which means it applies to all output >> files in the job. >> >> — Ken >> > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > > |
Hi Ken,
that's indeed a very odd issue that you found. I had a hard time to connect block size with S3 in the beginning and had to dig into the code. I still cannot fully understand why you got two different block size values from the S3 FileSytem. Looking into Hadoop code, I found the following snippet public long getDefaultBlockSize() { return this.getConf().getLong("fs.s3.block.size", 67108864L); } I don't see a quick fix for that. Looks like mismatching configurations on different machines. We should probably have some sanity checks to detect mismatching block header information, but unfortunately, the block header is very primitive and doesn't allow for sophisticated checks. So let's focus on implementation solutions: 1. I gather that you need to have support for data that uses IOReadableWritable. So moving to more versatile solutions like Avro or Parquet is unfortunately not an option. I'd still recommend that for any new project. 2. Storing block size into the repeated headers in a file introduces a kind of hen-and-egg problem. You need the block size to read the header to get the block size. 3. Storing block size once in first block would require additional seeks and depending of the degree of parallelism would put a rather high load on the data node with the first block. 4. Storing block size in metadata would be ideal but with the wide range of possible filesystems most likely not doable. 5. Explicitly setting the block size would be the most reliable technique but quite user-unfriendly, especially, if multiple deployment environment use different block sizes. 6. Adding a periodic marker seems indeed as the most robust option and adding 20 bytes every 2k bytes doesn't seem too bad for me. The downside is that seeking can take a long time for larger records as it will linearly scan through the bytes at the block start. However, if you really want to support copying files across file systems with different block sizes, this would be the only option. 7. Deprecating sequence format is a good option in the long run. I simply don't see that for productive code the performance gain over using Avro or Parquet would be noticeable and getting a solid base concept for schema evolution will pay off quickly from my experience. @Ken, could you please describe for what kind of data do you use the sequence format? I like to understand your requirements. How large are your records (OoM)? Are they POJOs? Do you craft them manually? Best, Arvid On Sun, Sep 1, 2019 at 9:42 PM Stephan Ewen <[hidden email]> wrote: > Sounds reasonable. > > I am adding Arvid to the thread - IIRC he authored that tool in his > Stratosphere days. And my a stroke of luck, he is now working on Flink > again. > > @Arvid - what are your thoughts on Ken's suggestions? > > On Fri, Aug 30, 2019 at 8:56 PM Ken Krugler <[hidden email]> > wrote: > > > Hi Stephan (switching to dev list), > > > > On Aug 29, 2019, at 2:52 AM, Stephan Ewen <[hidden email]> wrote: > > > > That is a good point. > > > > Which way would you suggest to go? Not relying on the FS block size at > > all, but using a fix (configurable) block size? > > > > > > There’s value to not requiring a fixed block size, as then a file that’s > > moved between different file systems can be read using whatever block > size > > is optimal for that system. > > > > Hadoop handles this in sequence files by storing a unique “sync marker” > > value in the file header (essentially a 16 byte UUID), injecting one of > > these every 2K bytes or so (in between records), and then code can scan > for > > this to find record boundaries without relying on a block size. The idea > is > > that 2^128 is a Big Number, so the odds of finding a false-positive sync > > marker in data is low enough to be ignorable. > > > > But that’s a bigger change. Simpler would be to put a header in each part > > file being written, with some signature bytes to aid in detecting > > old-format files. > > > > Or maybe deprecate SerializedOutputFormat/SerializedInputFormat, and > > provide some wrapper glue to make it easier to write/read Hadoop > > SequenceFiles that have a null key value, and store the POJO as the data > > value. Then you could also leverage Hadoop support for compression at > > either record or block level. > > > > — Ken > > > > > > On Thu, Aug 29, 2019 at 4:49 AM Ken Krugler <[hidden email] > > > > wrote: > > > >> Hi all, > >> > >> Wondering if anyone else has run into this. > >> > >> We write files to S3 using the SerializedOutputFormat<OurCustomPOJO>. > >> When we read them back, sometimes we get deserialization errors where > the > >> data seems to be corrupt. > >> > >> After a lot of logging, the weathervane of blame pointed towards the > >> block size somehow not being the same between the write (where it’s > 64MB) > >> and the read (unknown). > >> > >> When I added a call to SerializedInputFormat.setBlockSize(64MB), the > >> problems went away. > >> > >> It looks like both input and output formats use fs.getDefaultBlockSize() > >> to set this value by default, so maybe the root issue is S3 somehow > >> reporting different values. > >> > >> But it does feel a bit odd that we’re relying on this default setting, > >> versus it being recorded in the file during the write phase. > >> > >> And it’s awkward to try to set the block size on the write, as you need > >> to set it in the environment conf, which means it applies to all output > >> files in the job. > >> > >> — Ken > >> > > > > -------------------------- > > Ken Krugler > > +1 530-210-6378 > > http://www.scaleunlimited.com > > Custom big data solutions & training > > Flink, Solr, Hadoop, Cascading & Cassandra > > > > > -- Arvid Heise | Senior Software Engineer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
Hi Arvid,
Thanks for following up… > On Sep 2, 2019, at 3:09 AM, Arvid Heise <[hidden email]> wrote: > > Hi Ken, > > that's indeed a very odd issue that you found. I had a hard time to connect > block size with S3 in the beginning and had to dig into the code. I still > cannot fully understand why you got two different block size values from > the S3 FileSytem. Looking into Hadoop code, I found the following snippet > > public long getDefaultBlockSize() { > return this.getConf().getLong("fs.s3.block.size", 67108864L); > } > > I don't see a quick fix for that. Looks like mismatching configurations on > different machines. We should probably have some sanity checks to detect > mismatching block header information, but unfortunately, the block header > is very primitive and doesn't allow for sophisticated checks. Yes - and what made it harder to debug is that when the incorrect block size was set to 32MB, sometimes the first split that got processed was split[1] (second actual split). In that situation, the block info record was where the code expected it to be (since it was reading from 64MB - record size), so it all looked OK, but then the first record processed would be at an incorrect position. > So let's focus on implementation solutions: > 1. I gather that you need to have support for data that uses > IOReadableWritable. So moving to more versatile solutions like Avro or > Parquet is unfortunately not an option. I'd still recommend that for any > new project. See below - it’s not a requirement, but certainly easier. > 2. Storing block size into the repeated headers in a file introduces a kind > of hen-and-egg problem. You need the block size to read the header to get > the block size. > 3. Storing block size once in first block would require additional seeks > and depending of the degree of parallelism would put a rather high load on > the data node with the first block. > 4. Storing block size in metadata would be ideal but with the wide range of > possible filesystems most likely not doable. > 5. Explicitly setting the block size would be the most reliable technique > but quite user-unfriendly, especially, if multiple deployment environment > use different block sizes. > 6. Adding a periodic marker seems indeed as the most robust option and > adding 20 bytes every 2k bytes doesn't seem too bad for me. The downside is > that seeking can take a long time for larger records as it will linearly > scan through the bytes at the block start. However, if you really want to > support copying files across file systems with different block sizes, this > would be the only option. > 7. Deprecating sequence format is a good option in the long run. I simply > don't see that for productive code the performance gain over using Avro or > Parquet would be noticeable and getting a solid base concept for schema > evolution will pay off quickly from my experience. > > @Ken, could you please describe for what kind of data do you use the > sequence format? I like to understand your requirements. How large are your > records (OoM)? Are they POJOs? Do you craft them manually? They are hand-crafted POJOs, typically about 1.2K/record. It’s a mapping from words to feature vectors (and some additional data). I then use them as backing store with a cache (in a downstream job) as side-input to a map function that creates word vectors from large collections of text. This is why the serialized format was appealing, as it’s then relatively straightforward to use the existing deserialization logic when reading from my custom Java code. So yes, I could switch to Parquet with some additional work, I’ve used that format before in Hadoop jobs, but I’ve never tried directly reading from it. Ditto for Avro. Note that in my use case I don’t have to worry about evolving the schema, as it’s just transient data used in the middle of a batch workflow (to avoid really, really big joins that take forever). Regards, — Ken > On Sun, Sep 1, 2019 at 9:42 PM Stephan Ewen <[hidden email]> wrote: > >> Sounds reasonable. >> >> I am adding Arvid to the thread - IIRC he authored that tool in his >> Stratosphere days. And my a stroke of luck, he is now working on Flink >> again. >> >> @Arvid - what are your thoughts on Ken's suggestions? >> >> On Fri, Aug 30, 2019 at 8:56 PM Ken Krugler <[hidden email]> >> wrote: >> >>> Hi Stephan (switching to dev list), >>> >>> On Aug 29, 2019, at 2:52 AM, Stephan Ewen <[hidden email]> wrote: >>> >>> That is a good point. >>> >>> Which way would you suggest to go? Not relying on the FS block size at >>> all, but using a fix (configurable) block size? >>> >>> >>> There’s value to not requiring a fixed block size, as then a file that’s >>> moved between different file systems can be read using whatever block >> size >>> is optimal for that system. >>> >>> Hadoop handles this in sequence files by storing a unique “sync marker” >>> value in the file header (essentially a 16 byte UUID), injecting one of >>> these every 2K bytes or so (in between records), and then code can scan >> for >>> this to find record boundaries without relying on a block size. The idea >> is >>> that 2^128 is a Big Number, so the odds of finding a false-positive sync >>> marker in data is low enough to be ignorable. >>> >>> But that’s a bigger change. Simpler would be to put a header in each part >>> file being written, with some signature bytes to aid in detecting >>> old-format files. >>> >>> Or maybe deprecate SerializedOutputFormat/SerializedInputFormat, and >>> provide some wrapper glue to make it easier to write/read Hadoop >>> SequenceFiles that have a null key value, and store the POJO as the data >>> value. Then you could also leverage Hadoop support for compression at >>> either record or block level. >>> >>> — Ken >>> >>> >>> On Thu, Aug 29, 2019 at 4:49 AM Ken Krugler <[hidden email] >>> >>> wrote: >>> >>>> Hi all, >>>> >>>> Wondering if anyone else has run into this. >>>> >>>> We write files to S3 using the SerializedOutputFormat<OurCustomPOJO>. >>>> When we read them back, sometimes we get deserialization errors where >> the >>>> data seems to be corrupt. >>>> >>>> After a lot of logging, the weathervane of blame pointed towards the >>>> block size somehow not being the same between the write (where it’s >> 64MB) >>>> and the read (unknown). >>>> >>>> When I added a call to SerializedInputFormat.setBlockSize(64MB), the >>>> problems went away. >>>> >>>> It looks like both input and output formats use fs.getDefaultBlockSize() >>>> to set this value by default, so maybe the root issue is S3 somehow >>>> reporting different values. >>>> >>>> But it does feel a bit odd that we’re relying on this default setting, >>>> versus it being recorded in the file during the write phase. >>>> >>>> And it’s awkward to try to set the block size on the write, as you need >>>> to set it in the environment conf, which means it applies to all output >>>> files in the job. >>>> >>>> — Ken -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Hi Ken,
as far as I understood, you are using the format to overcome some short comings in Flink. There is no need to even look at the data or even to create it if the join would work decently. If so, then it would make sense to keep the format, as I expect similar issues to always appear and providing at least some kind of a workaround will probably help the user. At that point, I'm leaning towards embedding the sync markers. However, I'd take larger gaps than 2k and a larger sync pattern. Even for your use case, that would mean that we barely fit 2 records in that. I'd probably also add another format instead of extending the existing one. I see them as different concepts: the old one assumes a fixed block size, while the new one doesn't care about it. I opened a ticket for that https://issues.apache.org/jira/browse/FLINK-13956 . For the time being, I'd recommend to just setting a fixed block size. If it's on S3, the block size doesn't really matter much and you can pretty much tune it to the needed degree of parallelism. Additionally, it would be interesting to see if we can actually get that join going. Could you please describe the join and it's performance in another thread in the user list [hidden email] ? Best, Arvid On Tue, Sep 3, 2019 at 8:17 PM Ken Krugler <[hidden email]> wrote: > Hi Arvid, > > Thanks for following up… > > On Sep 2, 2019, at 3:09 AM, Arvid Heise <[hidden email]> wrote: > > Hi Ken, > > that's indeed a very odd issue that you found. I had a hard time to connect > block size with S3 in the beginning and had to dig into the code. I still > cannot fully understand why you got two different block size values from > the S3 FileSytem. Looking into Hadoop code, I found the following snippet > > public long getDefaultBlockSize() { > return this.getConf().getLong("fs.s3.block.size", 67108864L); > } > > I don't see a quick fix for that. Looks like mismatching configurations on > different machines. We should probably have some sanity checks to detect > mismatching block header information, but unfortunately, the block header > is very primitive and doesn't allow for sophisticated checks. > > > Yes - and what made it harder to debug is that when the incorrect block > size was set to 32MB, sometimes the first split that got processed was > split[1] (second actual split). In that situation, the block info record > was where the code expected it to be (since it was reading from 64MB - > record size), so it all looked OK, but then the first record processed > would be at an incorrect position. > > So let's focus on implementation solutions: > 1. I gather that you need to have support for data that uses > IOReadableWritable. So moving to more versatile solutions like Avro or > Parquet is unfortunately not an option. I'd still recommend that for any > new project. > > > See below - it’s not a requirement, but certainly easier. > > 2. Storing block size into the repeated headers in a file introduces a kind > of hen-and-egg problem. You need the block size to read the header to get > the block size. > 3. Storing block size once in first block would require additional seeks > and depending of the degree of parallelism would put a rather high load on > the data node with the first block. > 4. Storing block size in metadata would be ideal but with the wide range of > possible filesystems most likely not doable. > 5. Explicitly setting the block size would be the most reliable technique > but quite user-unfriendly, especially, if multiple deployment environment > use different block sizes. > 6. Adding a periodic marker seems indeed as the most robust option and > adding 20 bytes every 2k bytes doesn't seem too bad for me. The downside is > that seeking can take a long time for larger records as it will linearly > scan through the bytes at the block start. However, if you really want to > support copying files across file systems with different block sizes, this > would be the only option. > 7. Deprecating sequence format is a good option in the long run. I simply > don't see that for productive code the performance gain over using Avro or > Parquet would be noticeable and getting a solid base concept for schema > evolution will pay off quickly from my experience. > > @Ken, could you please describe for what kind of data do you use the > sequence format? I like to understand your requirements. How large are your > records (OoM)? Are they POJOs? Do you craft them manually? > > > They are hand-crafted POJOs, typically about 1.2K/record. > > It’s a mapping from words to feature vectors (and some additional data). > > I then use them as backing store with a cache (in a downstream job) as > side-input to a map function that creates word vectors from large > collections of text. This is why the serialized format was appealing, as > it’s then relatively straightforward to use the existing deserialization > logic when reading from my custom Java code. > > So yes, I could switch to Parquet with some additional work, I’ve used > that format before in Hadoop jobs, but I’ve never tried directly reading > from it. > > Ditto for Avro. Note that in my use case I don’t have to worry about > evolving the schema, as it’s just transient data used in the middle of a > batch workflow (to avoid really, really big joins that take forever). > > Regards, > > — Ken > > > > On Sun, Sep 1, 2019 at 9:42 PM Stephan Ewen <[hidden email]> wrote: > > Sounds reasonable. > > I am adding Arvid to the thread - IIRC he authored that tool in his > Stratosphere days. And my a stroke of luck, he is now working on Flink > again. > > @Arvid - what are your thoughts on Ken's suggestions? > > On Fri, Aug 30, 2019 at 8:56 PM Ken Krugler <[hidden email]> > wrote: > > Hi Stephan (switching to dev list), > > On Aug 29, 2019, at 2:52 AM, Stephan Ewen <[hidden email]> wrote: > > That is a good point. > > Which way would you suggest to go? Not relying on the FS block size at > all, but using a fix (configurable) block size? > > > There’s value to not requiring a fixed block size, as then a file that’s > moved between different file systems can be read using whatever block > > size > > is optimal for that system. > > Hadoop handles this in sequence files by storing a unique “sync marker” > value in the file header (essentially a 16 byte UUID), injecting one of > these every 2K bytes or so (in between records), and then code can scan > > for > > this to find record boundaries without relying on a block size. The idea > > is > > that 2^128 is a Big Number, so the odds of finding a false-positive sync > marker in data is low enough to be ignorable. > > But that’s a bigger change. Simpler would be to put a header in each part > file being written, with some signature bytes to aid in detecting > old-format files. > > Or maybe deprecate SerializedOutputFormat/SerializedInputFormat, and > provide some wrapper glue to make it easier to write/read Hadoop > SequenceFiles that have a null key value, and store the POJO as the data > value. Then you could also leverage Hadoop support for compression at > either record or block level. > > — Ken > > > On Thu, Aug 29, 2019 at 4:49 AM Ken Krugler <[hidden email] > > wrote: > > Hi all, > > Wondering if anyone else has run into this. > > We write files to S3 using the SerializedOutputFormat<OurCustomPOJO>. > When we read them back, sometimes we get deserialization errors where > > the > > data seems to be corrupt. > > After a lot of logging, the weathervane of blame pointed towards the > block size somehow not being the same between the write (where it’s > > 64MB) > > and the read (unknown). > > When I added a call to SerializedInputFormat.setBlockSize(64MB), the > problems went away. > > It looks like both input and output formats use fs.getDefaultBlockSize() > to set this value by default, so maybe the root issue is S3 somehow > reporting different values. > > But it does feel a bit odd that we’re relying on this default setting, > versus it being recorded in the file during the write phase. > > And it’s awkward to try to set the block size on the write, as you need > to set it in the environment conf, which means it applies to all output > files in the job. > > — Ken > > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > > -- Arvid Heise | Senior Software Engineer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
Free forum by Nabble | Edit this page |