Victor Wong created FLINK-12648:
----------------------------------- Summary: Load Hadoop file system via FileSystem.get() Key: FLINK-12648 URL: https://issues.apache.org/jira/browse/FLINK-12648 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Reporter: Victor Wong Assignee: Victor Wong I think there are some duplicated codes in _org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create_ with codes in apache hadoop-common dependency. We can use _org.apache.hadoop.fs.FileSystem#get(java.net.URI, org.apache.hadoop.conf.Configuration)_ to remove the duplicated codes. Replace {code:java} // -- (2) get the Hadoop file system class for that scheme final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass; try { fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig); } catch (IOException e) { throw new UnsupportedFileSystemSchemeException( "Hadoop File System abstraction does not support scheme '" + scheme + "'. " + "Either no file system implementation exists for that scheme, " + "or the relevant classes are missing from the classpath.", e); } // -- (3) instantiate the Hadoop file system LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", scheme, fsClass.getName()); final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance(); // -- (4) create the proper URI to initialize the file system final URI initUri; if (fsUri.getAuthority() != null) { initUri = fsUri; } else { LOG.debug("URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)"); String configEntry = hadoopConfig.get("fs.defaultFS", null); if (configEntry == null) { // fs.default.name deprecated as of hadoop 2.2.0 - see // http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html configEntry = hadoopConfig.get("fs.default.name", null); } if (LOG.isDebugEnabled()) { LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry); } if (configEntry == null) { throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + "Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS')."); } else { try { initUri = URI.create(configEntry); } catch (IllegalArgumentException e) { throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + "The configuration contains an invalid file system default name " + "('fs.default.name' or 'fs.defaultFS'): " + configEntry); } if (initUri.getAuthority() == null) { throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + "Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') " + "contains no valid authority component (like hdfs namenode, S3 host, etc)"); } } } // -- (5) configure the Hadoop file system try { hadoopFs.initialize(initUri, hadoopConfig); } catch (UnknownHostException e) { String message = "The Hadoop file system's authority (" + initUri.getAuthority() + "), specified by either the file URI or the configuration, cannot be resolved."; throw new IOException(message, e); } {code} with {code:java} final org.apache.hadoop.fs.FileSystem hadoopFs = org.apache.hadoop.fs.FileSystem.get(fsUri, hadoopConfig); {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |