Using md5 hash while sinking files to s3

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Using md5 hash while sinking files to s3

nikita Balakrishnan
Hello team,

I’m developing a system where we are trying to sink to an immutable s3
bucket. This bucket has server side encryption set as KMS. The DataStream
sink works perfectly fine when I don’t use the immutable bucket but when I
use an immutable bucket, I get exceptions regarding multipart upload
failures. It says we need to enable md5 hashing for the put object to work.

Here’s the stack trace:

org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
java.io.IOException: Uploading parts failed
... 11 common frames omitted
Caused by: java.io.IOException: Uploading parts failed
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
at
org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
at
org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
... 10 common frames omitted
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
is required for Put Part requests with Object Lock parameters (Service:
Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx;
S3 Extended Request ID: xxxx), S3 Extended Request ID: xxxxxx
:InvalidRequest: Content-MD5 HTTP header is required for Put Part requests
with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error
Code: InvalidRequest; Request ID: xxxx; S3 Extended Request ID: xxxx)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
at
org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
at
org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 common frames omitted
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5
HTTP header is required for Put Part requests with Object Lock parameters
(Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request
ID: xxxxx; S3 Extended Request ID: xxxx)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at
com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
at
com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 12 common frames omitted


My questions are:

1. Is this md5 hashing a mandatory rule to support this? The first part of
the file always gets uploaded to s3 but next part onward, it fails.
According to was s3 documentation for immutable buckets (with object locks)
they say it’s mandatory -
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
“The Content-MD5 header is required for any request to upload an object
with a retention period configured using Amazon S3 Object Lock. For more
information about Amazon S3 Object Lock, see Amazon S3 Object Lock Overview
<https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-overview.html> in
the *Amazon Simple Storage Service Developer Guide*."

2. If it’s mandatory, how do I set this HTTP header while sinking? I
checked most of the documentation and tried going through the source code
too but couldn’t really find a provision where we could set the headers for
a request that goes in as a sink.
Reply | Threaded
Open this post in threaded view
|

Re: Using md5 hash while sinking files to s3

Chesnay Schepler-3
Please try configuring :

fs.s3a.etag.checksum.enabled: true


On 16/07/2020 03:11, nikita Balakrishnan wrote:

> Hello team,
>
> I’m developing a system where we are trying to sink to an immutable s3
> bucket. This bucket has server side encryption set as KMS. The DataStream
> sink works perfectly fine when I don’t use the immutable bucket but when I
> use an immutable bucket, I get exceptions regarding multipart upload
> failures. It says we need to enable md5 hashing for the put object to work.
>
> Here’s the stack trace:
>
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception while processing timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
> java.io.IOException: Uploading parts failed
> ... 11 common frames omitted
> Caused by: java.io.IOException: Uploading parts failed
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
> at
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
> ... 10 common frames omitted
> Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
> raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
> com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
> is required for Put Part requests with Object Lock parameters (Service:
> Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx;
> S3 Extended Request ID: xxxx), S3 Extended Request ID: xxxxxx
> :InvalidRequest: Content-MD5 HTTP header is required for Put Part requests
> with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error
> Code: InvalidRequest; Request ID: xxxx; S3 Extended Request ID: xxxx)
> at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
> at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
> at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
> at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
> at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
> at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
> at
> org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
> at
> org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
> at
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
> at
> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 common frames omitted
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5
> HTTP header is required for Put Part requests with Object Lock parameters
> (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request
> ID: xxxxx; S3 Extended Request ID: xxxx)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> at
> com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
> at
> com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
> at
> org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
> at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> ... 12 common frames omitted
>
>
> My questions are:
>
> 1. Is this md5 hashing a mandatory rule to support this? The first part of
> the file always gets uploaded to s3 but next part onward, it fails.
> According to was s3 documentation for immutable buckets (with object locks)
> they say it’s mandatory -
> https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
> “The Content-MD5 header is required for any request to upload an object
> with a retention period configured using Amazon S3 Object Lock. For more
> information about Amazon S3 Object Lock, see Amazon S3 Object Lock Overview
> <https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-overview.html> in
> the *Amazon Simple Storage Service Developer Guide*."
>
> 2. If it’s mandatory, how do I set this HTTP header while sinking? I
> checked most of the documentation and tried going through the source code
> too but couldn’t really find a provision where we could set the headers for
> a request that goes in as a sink.
>

Reply | Threaded
Open this post in threaded view
|

Re: Using md5 hash while sinking files to s3

nikita Balakrishnan
Hey Chesnay,

Thank you for getting back with that! I tried setting that too, it still
gives me the same exception. Is there something else that I'm missing?
I also have fs.s3a.bucket.<bucket-name>.server-side-encryption-algorithm=SSE-KMS
and fs.s3a.bucket.<bucket-name>.server-side-encryption.key set.

Is there no need to set the md5 hash value manually while sinking? The
fs.s3a.etag.checksum.enabled:
true will do it for me? And Do I need to specify anywhere that we have to
use md5 hashing?


On Thu, Jul 16, 2020 at 12:04 AM Chesnay Schepler <[hidden email]>
wrote:

> Please try configuring :
>
> fs.s3a.etag.checksum.enabled: true
>
>
> On 16/07/2020 03:11, nikita Balakrishnan wrote:
>
> Hello team,
>
> I’m developing a system where we are trying to sink to an immutable s3
> bucket. This bucket has server side encryption set as KMS. The DataStream
> sink works perfectly fine when I don’t use the immutable bucket but when I
> use an immutable bucket, I get exceptions regarding multipart upload
> failures. It says we need to enable md5 hashing for the put object to work.
>
> Here’s the stack trace:
>
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception while processing timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
> java.io.IOException: Uploading parts failed
> ... 11 common frames omitted
> Caused by: java.io.IOException: Uploading parts failed
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
> at
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
> ... 10 common frames omitted
> Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
> raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
> com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
> is required for Put Part requests with Object Lock parameters (Service:
> Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx;
> S3 Extended Request ID: xxxx), S3 Extended Request ID: xxxxxx
> :InvalidRequest: Content-MD5 HTTP header is required for Put Part requests
> with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error
> Code: InvalidRequest; Request ID: xxxx; S3 Extended Request ID: xxxx)
> at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
> at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
> at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
> at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
> at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
> at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
> at
> org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
> at
> org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
> at
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
> at
> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 common frames omitted
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5
> HTTP header is required for Put Part requests with Object Lock parameters
> (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request
> ID: xxxxx; S3 Extended Request ID: xxxx)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> at
> com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
> at
> com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
> at
> org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
> at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> ... 12 common frames omitted
>
>
> My questions are:
>
> 1. Is this md5 hashing a mandatory rule to support this? The first part of
> the file always gets uploaded to s3 but next part onward, it fails.
> According to was s3 documentation for immutable buckets (with object locks)
> they say it’s mandatory -https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
> “The Content-MD5 header is required for any request to upload an object
> with a retention period configured using Amazon S3 Object Lock. For more
> information about Amazon S3 Object Lock, see Amazon S3 Object Lock Overview<https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-overview.html> <https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-overview.html> in
> the *Amazon Simple Storage Service Developer Guide*."
>
> 2. If it’s mandatory, how do I set this HTTP header while sinking? I
> checked most of the documentation and tried going through the source code
> too but couldn’t really find a provision where we could set the headers for
> a request that goes in as a sink.
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Using md5 hash while sinking files to s3

Chesnay Schepler-3
I only quickly skimmed the Hadoop docs and found this (although it is
not documented very well I might add). If this does not do the trick,
I'd suggest to reach out to the Hadoop project, since we're using their
S3 filesystem.

On 16/07/2020 19:32, nikita Balakrishnan wrote:

> Hey Chesnay,
>
> Thank you for getting back with that! I tried setting that too, it
> still gives me the same exception. Is there something else that I'm
> missing?
> I also have
> fs.s3a.bucket.<bucket-name>.server-side-encryption-algorithm=SSE-KMS
> and fs.s3a.bucket.<bucket-name>.server-side-encryption.key set.
>
> Is there no need to set the md5 hash value manually while sinking? The
> fs.s3a.etag.checksum.enabled: true will do it for me? And Do I need to
> specify anywhere that we have to use md5 hashing?
>
>
> On Thu, Jul 16, 2020 at 12:04 AM Chesnay Schepler <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Please try configuring :
>
>     fs.s3a.etag.checksum.enabled: true
>
>
>     On 16/07/2020 03:11, nikita Balakrishnan wrote:
>>     Hello team,
>>
>>     I’m developing a system where we are trying to sink to an immutable s3
>>     bucket. This bucket has server side encryption set as KMS. The DataStream
>>     sink works perfectly fine when I don’t use the immutable bucket but when I
>>     use an immutable bucket, I get exceptions regarding multipart upload
>>     failures. It says we need to enable md5 hashing for the put object to work.
>>
>>     Here’s the stack trace:
>>
>>     org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>     exception while processing timer.
>>     at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
>>     at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
>>     at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520)
>>     at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
>>     at
>>     org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
>>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>>     at
>>     org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>>     at
>>     org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>     at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>>     at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>     at java.base/java.lang.Thread.run(Thread.java:834)
>>     Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
>>     java.io.IOException: Uploading parts failed
>>     ... 11 common frames omitted
>>     Caused by: java.io.IOException: Uploading parts failed
>>     at
>>     org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
>>     at
>>     org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
>>     at
>>     org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
>>     at
>>     org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
>>     at
>>     org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
>>     at
>>     org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
>>     at
>>     org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
>>     at
>>     org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
>>     at
>>     org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338)
>>     at
>>     org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304)
>>     at
>>     org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439)
>>     at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
>>     ... 10 common frames omitted
>>     Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on
>>     raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt:
>>     com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header
>>     is required for Put Part requests with Object Lock parameters (Service:
>>     Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx;
>>     S3 Extended Request ID: xxxx), S3 Extended Request ID: xxxxxx
>>     :InvalidRequest: Content-MD5 HTTP header is required for Put Part requests
>>     with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error
>>     Code: InvalidRequest; Request ID: xxxx; S3 Extended Request ID: xxxx)
>>     at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
>>     at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
>>     at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>>     at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>>     at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>>     at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>>     at
>>     org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>>     at
>>     org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>>     at
>>     org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
>>     at
>>     org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
>>     at
>>     org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>>     at
>>     java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>     at
>>     java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>     ... 1 common frames omitted
>>     Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5
>>     HTTP header is required for Put Part requests with Object Lock parameters
>>     (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request
>>     ID: xxxxx; S3 Extended Request ID: xxxx)
>>     at
>>     com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
>>     at
>>     com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>>     at
>>     com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
>>     at
>>     com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>     at
>>     com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>     at
>>     com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>     at
>>     com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>     at
>>     com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>     at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>>     at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>>     at
>>     com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>>     at
>>     com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>>     at
>>     org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>>     at
>>     org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>>     at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>>     ... 12 common frames omitted
>>
>>
>>     My questions are:
>>
>>     1. Is this md5 hashing a mandatory rule to support this? The first part of
>>     the file always gets uploaded to s3 but next part onward, it fails.
>>     According to was s3 documentation for immutable buckets (with object locks)
>>     they say it’s mandatory -
>>     https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
>>     “The Content-MD5 header is required for any request to upload an object
>>     with a retention period configured using Amazon S3 Object Lock. For more
>>     information about Amazon S3 Object Lock, see Amazon S3 Object Lock Overview
>>     <https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-overview.html>  <https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-overview.html>  in
>>     the *Amazon Simple Storage Service Developer Guide*."
>>
>>     2. If it’s mandatory, how do I set this HTTP header while sinking? I
>>     checked most of the documentation and tried going through the source code
>>     too but couldn’t really find a provision where we could set the headers for
>>     a request that goes in as a sink.
>>
>