Mario Georgiev created FLINK-11429:
--------------------------------------- Summary: Flink fails to authenticate s3a with core-site.xml Key: FLINK-11429 URL: https://issues.apache.org/jira/browse/FLINK-11429 Project: Flink Issue Type: Bug Affects Versions: 1.7.1 Reporter: Mario Georgiev Hello, Problem is, if i put the core-site.xml somewhere and add it in the flink image, put the path to it in the flink-conf.yaml it does not get picked and i get an exception {code:java} Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to load credentials from service endpoint at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337) at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) ... 31 more Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to load credentials from service endpoint at org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183) at org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162) at org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82) at org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117) ... 48 more Caused by: java.net.SocketException: Network unreachable (connect failed) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at sun.net.NetworkClient.doConnect(NetworkClient.java:175) at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) at sun.net.www.http.HttpClient.New(HttpClient.java:339) at sun.net.www.http.HttpClient.New(HttpClient.java:357) at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199) at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984) at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54) at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108) at org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79) at org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174) at org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122) {code} However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the Dockerfile, they get picked and it works. Why is it disregarding the core-site.xml? {code:java} <configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <!-- Comma separated list of local directories used to buffer large results prior to transmitting them to S3. --> <property> <name>fs.s3a.buffer.dir</name> <value>/tmp</value> </property> <property> <name>fs.s3a.access.key</name> <description>AWS access key ID. Omit for IAM role-based or provider-based authentication.</description> <value><hidden></value> </property> <property> <name>fs.s3a.secret.key</name> <description>AWS secret key. Omit for IAM role-based or provider-based authentication.</description> <value><hidden></value> </property> <property> <name>fs.s3a.aws.credentials.provider</name> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> </property> </configuration> {code} I am building the kubernetes standalone image as following : Dockerfile : {code:java} ################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ FROM openjdk:8-jre-alpine # Install requirements # Modification to original Dockerfile to support rocksdb # RUN apk add --no-cache bash snappy # This is a fix for RocksDB compatibility # Flink environment variables ENV FLINK_INSTALL_PATH=/opt ENV FLINK_HOME $FLINK_INSTALL_PATH/flink ENV FLINK_LIB_DIR $FLINK_HOME/lib ENV PATH $PATH:$FLINK_HOME/bin ENV FLINK_CONF $FLINK_HOME/conf # flink-dist can point to a directory or a tarball on the local system ARG flink_dist=NOT_SET ARG job_jar=NOT_SET # Install build dependencies and flink ADD $flink_dist $FLINK_INSTALL_PATH ADD $job_jar $FLINK_INSTALL_PATH/job.jar RUN set -x && \ ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \ ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \ addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \ chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \ chown -h flink:flink $FLINK_HOME # Modification to original Dockerfile RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2' COPY core-site.xml /etc/hadoop/conf/core-site.xml RUN echo "fs.hdfs.hadoopconf: /etc/hadoop/conf" >> $FLINK_CONF/flink-conf.yaml COPY docker-entrypoint.sh / RUN chmod +x docker-entrypoint.sh RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.7.3.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.11.183.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.183/aws-java-sdk-s3-1.11.183.jar RUN wget -O $FLINK_LIB_DIR/flink-s3-fs-hadoop-1.7.1.jar http://central.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.7.1/flink-s3-fs-hadoop-1.7.1.jar #Transitive Dependency of aws-java-sdk-s3 RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.183.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.183/aws-java-sdk-core-1.11.183.jar RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.183.jar http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.183/aws-java-sdk-kms-1.11.183.jar RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.6.7.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.6.7/jackson-annotations-2.6.7.jar RUN wget -O $FLINK_LIB_DIR/jackson-core-2.6.7.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.6.7/jackson-core-2.6.7.jar RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.6.7.jar http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.6.7/jackson-databind-2.6.7.jar RUN wget -O $FLINK_LIB_DIR/joda-time-2.8.1.jar http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.4.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.3.jar http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.3/httpclient-4.5.3.jar #Modification to original Dockerfile USER flink EXPOSE 8081 6123 ENTRYPOINT ["/docker-entrypoint.sh"] CMD ["--help"] {code} {code:java} import org.apache.commons.lang3.RandomStringUtils; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.Random; public class WordCount { public static void main (String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000L); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); String words1[] = new String[]{ "football", "soccer", "billiards", "snooker", "tennis", "handball", "basketball" }; List<String> words = new ArrayList<>(); Random rnd = new Random(); for (int i =0 ; i < 500000;i++) { words.add(words1[rnd.nextInt(words1.length-1)]); } DataStreamSource<String> src = env.fromElements(words.toArray(new String[]{})); src.map(str -> str.toLowerCase()) .flatMap(new Splitter()) .returns(TypeInformation.of(new TypeHint<Tuple2<String,Integer>>(){})) .keyBy(0) .sum(1) .print(); env.execute(); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word : sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } } {code} Job manger kubernetes args : It is a template, so disregard the placeholders {code:java} "job-cluster", "--job-classname", "{classname}", "-Djobmanager.rpc.address={cluster.name}-jobmanager", "-Dparallelism.default=2", "-Dblob.server.port=6124", "-Dqueryable-state.server.ports=6125", "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/", "-Dstate.backend=rocksdb", "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}", "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}", "-Dstate.backend.incremental=true" {code} Task manager kubernetes args: Again, templated {code:java} ["task-manager", "-Djobmanager.rpc.address={cluster.name}-jobmanager", "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/", "-Dstate.backend=rocksdb", "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}", "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}", "-Dstate.backend.incremental=true"] {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |