Re: StreamingFileSink causing AmazonS3Exception

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink causing AmazonS3Exception

Addison Higham
Oh this is timely!

I hope I can save you some pain Kostas! (cc-ing to flink dev to get
feedback there for what I believe to be a confirmed bug)


I was just about to open up a flink issue for this after digging (really)
deep and figuring out the issue over the weekend.

The problem arises due the flink hands input streams to the S3AccessHelper.
If you turn on debug logs for s3, you will eventually see this stack trace:

2018-12-17 05:55:46,546 DEBUG
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  -
FYI: failed to reset content inputstream before throwing up
java.io.IOException: Resetting to invalid mark
  at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
  at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
  at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
  at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
  at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
  at
org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

From this, you can see that for (some reason) AWS fails to write a
multi-part chunk and then tries to reset the input stream in order to retry
but fails (because the InputStream is not mark-able)

That exception is swallowed (it seems like it should be raised up to
client, but isn't for an unknown reason). The s3-client then tries to
repeat the request using it's built in retry logic, however, because the
InputStream is consumed
and has no more bytes to write, we never fill up the expected
content-length that the s3 put request is expecting. Eventually, after it
hits the max number of retries, it fails and you get the error above.

I just started running a fix for this (which is a hack not the real
solution) here:
https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6

This whole thing is documented here:
https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html

However, I found that just using the documented property didn't appear to
work and I had to wrap the InputStream in the BufferedInputStream for it to
work.

I think the real fix is either to:

1. Use the BufferedInputStream but make it configurable
2. Refactor S3AccessHelper to have another signature that takes a File
object and change the RefCountedFSOutputStream to also be able to give a
reference the the underlying file.

I can pretty easily do this work, but would be curious the direction that
the maintainers would prefer.

Thanks,

Addison!






On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <[hidden email]>
wrote:

> Hi Steffen,
>
> Thanks for reporting this.
>
> Internally Flink does not keep any open connections to S3.  It only keeps
> buffers data internally up
> till the point they reach a min-size limit (by default 5MB) and then
> uploads them as a part of
> an MPU on one go. Given this, I will have to dig a bit dipper to see why a
> connection would timeout.
>
> If you are willing to dig into the code, all interactions with S3 pass
> through the S3AccessHelper
> class and its implementation, the HadoopS3AccessHelper. For the buffering
> and uploading logic,
> you could have a look at the S3RecoverableWriter and the
> S3RecoverableFsDataOutputStream.
>
> I will keep looking into it. In the meantime, if you find anything let us
> know.
>
> Cheers,
> Kostas
>
>
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink causing AmazonS3Exception

Addison Higham
Issue opened here: https://issues.apache.org/jira/browse/FLINK-11187

On Mon, Dec 17, 2018 at 2:37 PM Addison Higham <[hidden email]> wrote:

> Oh this is timely!
>
> I hope I can save you some pain Kostas! (cc-ing to flink dev to get
> feedback there for what I believe to be a confirmed bug)
>
>
> I was just about to open up a flink issue for this after digging (really)
> deep and figuring out the issue over the weekend.
>
> The problem arises due the flink hands input streams to the
> S3AccessHelper. If you turn on debug logs for s3, you will eventually see
> this stack trace:
>
> 2018-12-17 05:55:46,546 DEBUG
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  -
> FYI: failed to reset content inputstream before throwing up
> java.io.IOException: Resetting to invalid mark
>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>   at
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>   at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>   at
> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>
> From this, you can see that for (some reason) AWS fails to write a
> multi-part chunk and then tries to reset the input stream in order to retry
> but fails (because the InputStream is not mark-able)
>
> That exception is swallowed (it seems like it should be raised up to
> client, but isn't for an unknown reason). The s3-client then tries to
> repeat the request using it's built in retry logic, however, because the
> InputStream is consumed
> and has no more bytes to write, we never fill up the expected
> content-length that the s3 put request is expecting. Eventually, after it
> hits the max number of retries, it fails and you get the error above.
>
> I just started running a fix for this (which is a hack not the real
> solution) here:
> https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6
>
> This whole thing is documented here:
> https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html
>
> However, I found that just using the documented property didn't appear to
> work and I had to wrap the InputStream in the BufferedInputStream for it to
> work.
>
> I think the real fix is either to:
>
> 1. Use the BufferedInputStream but make it configurable
> 2. Refactor S3AccessHelper to have another signature that takes a File
> object and change the RefCountedFSOutputStream to also be able to give a
> reference the the underlying file.
>
> I can pretty easily do this work, but would be curious the direction that
> the maintainers would prefer.
>
> Thanks,
>
> Addison!
>
>
>
>
>
>
> On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <
> [hidden email]> wrote:
>
>> Hi Steffen,
>>
>> Thanks for reporting this.
>>
>> Internally Flink does not keep any open connections to S3.  It only keeps
>> buffers data internally up
>> till the point they reach a min-size limit (by default 5MB) and then
>> uploads them as a part of
>> an MPU on one go. Given this, I will have to dig a bit dipper to see why
>> a connection would timeout.
>>
>> If you are willing to dig into the code, all interactions with S3 pass
>> through the S3AccessHelper
>> class and its implementation, the HadoopS3AccessHelper. For the buffering
>> and uploading logic,
>> you could have a look at the S3RecoverableWriter and the
>> S3RecoverableFsDataOutputStream.
>>
>> I will keep looking into it. In the meantime, if you find anything let us
>> know.
>>
>> Cheers,
>> Kostas
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink causing AmazonS3Exception

Padarn Wilson
In reply to this post by Addison Higham
Hi Addison, Kostas, Steffan,

I am also encountering this exact issue. I cannot find a JIRA ticket on
this, is there some planned work on implementing a fix?

@Addison - Did you manage to find a fix that you could apply without
modifying the Flink codebase? If possible it would be better not patch the
code base and compile a custom image.

Thanks,
Padarn

On Tue, Dec 18, 2018 at 5:37 AM Addison Higham <[hidden email]> wrote:

> Oh this is timely!
>
> I hope I can save you some pain Kostas! (cc-ing to flink dev to get
> feedback there for what I believe to be a confirmed bug)
>
>
> I was just about to open up a flink issue for this after digging (really)
> deep and figuring out the issue over the weekend.
>
> The problem arises due the flink hands input streams to the
> S3AccessHelper. If you turn on debug logs for s3, you will eventually see
> this stack trace:
>
> 2018-12-17 05:55:46,546 DEBUG
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  -
> FYI: failed to reset content inputstream before throwing up
> java.io.IOException: Resetting to invalid mark
>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>   at
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>   at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>   at
> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>
> From this, you can see that for (some reason) AWS fails to write a
> multi-part chunk and then tries to reset the input stream in order to retry
> but fails (because the InputStream is not mark-able)
>
> That exception is swallowed (it seems like it should be raised up to
> client, but isn't for an unknown reason). The s3-client then tries to
> repeat the request using it's built in retry logic, however, because the
> InputStream is consumed
> and has no more bytes to write, we never fill up the expected
> content-length that the s3 put request is expecting. Eventually, after it
> hits the max number of retries, it fails and you get the error above.
>
> I just started running a fix for this (which is a hack not the real
> solution) here:
> https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6
>
> This whole thing is documented here:
> https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html
>
> However, I found that just using the documented property didn't appear to
> work and I had to wrap the InputStream in the BufferedInputStream for it to
> work.
>
> I think the real fix is either to:
>
> 1. Use the BufferedInputStream but make it configurable
> 2. Refactor S3AccessHelper to have another signature that takes a File
> object and change the RefCountedFSOutputStream to also be able to give a
> reference the the underlying file.
>
> I can pretty easily do this work, but would be curious the direction that
> the maintainers would prefer.
>
> Thanks,
>
> Addison!
>
>
>
>
>
>
> On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <
> [hidden email]> wrote:
>
>> Hi Steffen,
>>
>> Thanks for reporting this.
>>
>> Internally Flink does not keep any open connections to S3.  It only keeps
>> buffers data internally up
>> till the point they reach a min-size limit (by default 5MB) and then
>> uploads them as a part of
>> an MPU on one go. Given this, I will have to dig a bit dipper to see why
>> a connection would timeout.
>>
>> If you are willing to dig into the code, all interactions with S3 pass
>> through the S3AccessHelper
>> class and its implementation, the HadoopS3AccessHelper. For the buffering
>> and uploading logic,
>> you could have a look at the S3RecoverableWriter and the
>> S3RecoverableFsDataOutputStream.
>>
>> I will keep looking into it. In the meantime, if you find anything let us
>> know.
>>
>> Cheers,
>> Kostas
>>
>>

--
_Grab is hiring. Learn more at *https://grab.careers 
<https://grab.careers/>*_


By communicating with Grab Inc and/or its
subsidiaries, associate companies and jointly controlled entities (“Grab
Group”), you are deemed to have consented to processing of your personal
data as set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/ <https://grab.com/privacy/>


This email contains
confidential information and is only for the intended recipient(s). If you
are not the intended recipient(s), please do not disseminate, distribute or
copy this email and notify Grab Group immediately if you have received this
by mistake and delete this email from your system. Email transmission
cannot be guaranteed to be secure or error-free as any information therein
could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or
contain viruses. Grab Group do not accept liability for any errors or
omissions in the contents of this email arises as a result of email
transmission. All intellectual property rights in this email and
attachments therein shall remain vested in Grab Group, unless otherwise
provided by law.

Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink causing AmazonS3Exception

Kostas Kloudas-3
Hi Padarn,

This is the jira issue:  https://issues.apache.org/jira/browse/FLINK-11187
and the fix, as you can see, was first included in version 1.7.2.

Cheers,
Kostas

On Mon, Feb 18, 2019 at 3:49 AM Padarn Wilson <[hidden email]>
wrote:

> Hi Addison, Kostas, Steffan,
>
> I am also encountering this exact issue. I cannot find a JIRA ticket on
> this, is there some planned work on implementing a fix?
>
> @Addison - Did you manage to find a fix that you could apply without
> modifying the Flink codebase? If possible it would be better not patch the
> code base and compile a custom image.
>
> Thanks,
> Padarn
>
> On Tue, Dec 18, 2018 at 5:37 AM Addison Higham <[hidden email]> wrote:
>
>> Oh this is timely!
>>
>> I hope I can save you some pain Kostas! (cc-ing to flink dev to get
>> feedback there for what I believe to be a confirmed bug)
>>
>>
>> I was just about to open up a flink issue for this after digging (really)
>> deep and figuring out the issue over the weekend.
>>
>> The problem arises due the flink hands input streams to the
>> S3AccessHelper. If you turn on debug logs for s3, you will eventually see
>> this stack trace:
>>
>> 2018-12-17 05:55:46,546 DEBUG
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  -
>> FYI: failed to reset content inputstream before throwing up
>> java.io.IOException: Resetting to invalid mark
>>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>>   at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>>   at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>>   at
>> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>>   at
>> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>>   at
>> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>>   at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>   at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>   at java.lang.Thread.run(Thread.java:748)
>>
>> From this, you can see that for (some reason) AWS fails to write a
>> multi-part chunk and then tries to reset the input stream in order to retry
>> but fails (because the InputStream is not mark-able)
>>
>> That exception is swallowed (it seems like it should be raised up to
>> client, but isn't for an unknown reason). The s3-client then tries to
>> repeat the request using it's built in retry logic, however, because the
>> InputStream is consumed
>> and has no more bytes to write, we never fill up the expected
>> content-length that the s3 put request is expecting. Eventually, after it
>> hits the max number of retries, it fails and you get the error above.
>>
>> I just started running a fix for this (which is a hack not the real
>> solution) here:
>> https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6
>>
>> This whole thing is documented here:
>> https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html
>>
>> However, I found that just using the documented property didn't appear to
>> work and I had to wrap the InputStream in the BufferedInputStream for it to
>> work.
>>
>> I think the real fix is either to:
>>
>> 1. Use the BufferedInputStream but make it configurable
>> 2. Refactor S3AccessHelper to have another signature that takes a File
>> object and change the RefCountedFSOutputStream to also be able to give a
>> reference the the underlying file.
>>
>> I can pretty easily do this work, but would be curious the direction that
>> the maintainers would prefer.
>>
>> Thanks,
>>
>> Addison!
>>
>>
>>
>>
>>
>>
>> On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <
>> [hidden email]> wrote:
>>
>>> Hi Steffen,
>>>
>>> Thanks for reporting this.
>>>
>>> Internally Flink does not keep any open connections to S3.  It only
>>> keeps buffers data internally up
>>> till the point they reach a min-size limit (by default 5MB) and then
>>> uploads them as a part of
>>> an MPU on one go. Given this, I will have to dig a bit dipper to see why
>>> a connection would timeout.
>>>
>>> If you are willing to dig into the code, all interactions with S3 pass
>>> through the S3AccessHelper
>>> class and its implementation, the HadoopS3AccessHelper. For the
>>> buffering and uploading logic,
>>> you could have a look at the S3RecoverableWriter and the
>>> S3RecoverableFsDataOutputStream.
>>>
>>> I will keep looking into it. In the meantime, if you find anything let
>>> us know.
>>>
>>> Cheers,
>>> Kostas
>>>
>>>
> *Grab is hiring. Learn more at **https://grab.careers
> <https://grab.careers/>*
>
> By communicating with Grab Inc and/or its subsidiaries, associate
> companies and jointly controlled entities (“Grab Group”), you are deemed to
> have consented to processing of your personal data as set out in the
> Privacy Notice which can be viewed at https://grab.com/privacy/
>
> This email contains confidential information and is only for the intended
> recipient(s). If you are not the intended recipient(s), please do not
> disseminate, distribute or copy this email and notify Grab Group
> immediately if you have received this by mistake and delete this email from
> your system. Email transmission cannot be guaranteed to be secure or
> error-free as any information therein could be intercepted, corrupted,
> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
> not accept liability for any errors or omissions in the contents of this
> email arises as a result of email transmission. All intellectual property
> rights in this email and attachments therein shall remain vested in Grab
> Group, unless otherwise provided by law.
>


--

Kostas Kloudas | Software Engineer


<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink causing AmazonS3Exception

Padarn Wilson
Thanks Kostas!

On Mon, Feb 18, 2019 at 5:10 PM Kostas Kloudas <[hidden email]>
wrote:

> Hi Padarn,
>
> This is the jira issue:  https://issues.apache.org/jira/browse/FLINK-11187
> and the fix, as you can see, was first included in version 1.7.2.
>
> Cheers,
> Kostas
>
> On Mon, Feb 18, 2019 at 3:49 AM Padarn Wilson <[hidden email]>
> wrote:
>
>> Hi Addison, Kostas, Steffan,
>>
>> I am also encountering this exact issue. I cannot find a JIRA ticket on
>> this, is there some planned work on implementing a fix?
>>
>> @Addison - Did you manage to find a fix that you could apply without
>> modifying the Flink codebase? If possible it would be better not patch the
>> code base and compile a custom image.
>>
>> Thanks,
>> Padarn
>>
>> On Tue, Dec 18, 2018 at 5:37 AM Addison Higham <[hidden email]>
>> wrote:
>>
>>> Oh this is timely!
>>>
>>> I hope I can save you some pain Kostas! (cc-ing to flink dev to get
>>> feedback there for what I believe to be a confirmed bug)
>>>
>>>
>>> I was just about to open up a flink issue for this after digging
>>> (really) deep and figuring out the issue over the weekend.
>>>
>>> The problem arises due the flink hands input streams to the
>>> S3AccessHelper. If you turn on debug logs for s3, you will eventually see
>>> this stack trace:
>>>
>>> 2018-12-17 05:55:46,546 DEBUG
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  -
>>> FYI: failed to reset content inputstream before throwing up
>>> java.io.IOException: Resetting to invalid mark
>>>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>>>   at
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>>>   at
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>>>   at
>>> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>>>   at
>>> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>>>   at
>>> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>>>   at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>   at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>   at java.lang.Thread.run(Thread.java:748)
>>>
>>> From this, you can see that for (some reason) AWS fails to write a
>>> multi-part chunk and then tries to reset the input stream in order to retry
>>> but fails (because the InputStream is not mark-able)
>>>
>>> That exception is swallowed (it seems like it should be raised up to
>>> client, but isn't for an unknown reason). The s3-client then tries to
>>> repeat the request using it's built in retry logic, however, because the
>>> InputStream is consumed
>>> and has no more bytes to write, we never fill up the expected
>>> content-length that the s3 put request is expecting. Eventually, after it
>>> hits the max number of retries, it fails and you get the error above.
>>>
>>> I just started running a fix for this (which is a hack not the real
>>> solution) here:
>>> https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6
>>>
>>> This whole thing is documented here:
>>> https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html
>>>
>>> However, I found that just using the documented property didn't appear
>>> to work and I had to wrap the InputStream in the BufferedInputStream for it
>>> to work.
>>>
>>> I think the real fix is either to:
>>>
>>> 1. Use the BufferedInputStream but make it configurable
>>> 2. Refactor S3AccessHelper to have another signature that takes a File
>>> object and change the RefCountedFSOutputStream to also be able to give a
>>> reference the the underlying file.
>>>
>>> I can pretty easily do this work, but would be curious the direction
>>> that the maintainers would prefer.
>>>
>>> Thanks,
>>>
>>> Addison!
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <
>>> [hidden email]> wrote:
>>>
>>>> Hi Steffen,
>>>>
>>>> Thanks for reporting this.
>>>>
>>>> Internally Flink does not keep any open connections to S3.  It only
>>>> keeps buffers data internally up
>>>> till the point they reach a min-size limit (by default 5MB) and then
>>>> uploads them as a part of
>>>> an MPU on one go. Given this, I will have to dig a bit dipper to see
>>>> why a connection would timeout.
>>>>
>>>> If you are willing to dig into the code, all interactions with S3 pass
>>>> through the S3AccessHelper
>>>> class and its implementation, the HadoopS3AccessHelper. For the
>>>> buffering and uploading logic,
>>>> you could have a look at the S3RecoverableWriter and the
>>>> S3RecoverableFsDataOutputStream.
>>>>
>>>> I will keep looking into it. In the meantime, if you find anything let
>>>> us know.
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>>
>> *Grab is hiring. Learn more at **https://grab.careers
>> <https://grab.careers/>*
>>
>> By communicating with Grab Inc and/or its subsidiaries, associate
>> companies and jointly controlled entities (“Grab Group”), you are deemed to
>> have consented to processing of your personal data as set out in the
>> Privacy Notice which can be viewed at https://grab.com/privacy/
>>
>> This email contains confidential information and is only for the intended
>> recipient(s). If you are not the intended recipient(s), please do not
>> disseminate, distribute or copy this email and notify Grab Group
>> immediately if you have received this by mistake and delete this email from
>> your system. Email transmission cannot be guaranteed to be secure or
>> error-free as any information therein could be intercepted, corrupted,
>> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
>> not accept liability for any errors or omissions in the contents of this
>> email arises as a result of email transmission. All intellectual property
>> rights in this email and attachments therein shall remain vested in Grab
>> Group, unless otherwise provided by law.
>>
>
>
> --
>
> Kostas Kloudas | Software Engineer
>
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

--
_Grab is hiring. Learn more at *https://grab.careers 
<https://grab.careers/>*_


By communicating with Grab Inc and/or its
subsidiaries, associate companies and jointly controlled entities (“Grab
Group”), you are deemed to have consented to processing of your personal
data as set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/ <https://grab.com/privacy/>


This email contains
confidential information and is only for the intended recipient(s). If you
are not the intended recipient(s), please do not disseminate, distribute or
copy this email and notify Grab Group immediately if you have received this
by mistake and delete this email from your system. Email transmission
cannot be guaranteed to be secure or error-free as any information therein
could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or
contain viruses. Grab Group do not accept liability for any errors or
omissions in the contents of this email arises as a result of email
transmission. All intellectual property rights in this email and
attachments therein shall remain vested in Grab Group, unless otherwise
provided by law.