[jira] [Created] (FLINK-12648) Load Hadoop file system via FileSystem.get()

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-12648) Load Hadoop file system via FileSystem.get()

Shang Yuanchun (Jira)
Victor Wong created FLINK-12648:
-----------------------------------

             Summary: Load Hadoop file system via FileSystem.get()
                 Key: FLINK-12648
                 URL: https://issues.apache.org/jira/browse/FLINK-12648
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / FileSystem
            Reporter: Victor Wong
            Assignee: Victor Wong


I think there are some duplicated codes in _org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create_ with codes in apache hadoop-common dependency.

We can use _org.apache.hadoop.fs.FileSystem#get(java.net.URI, org.apache.hadoop.conf.Configuration)_ to remove the duplicated codes.

 

Replace
{code:java}
// -- (2) get the Hadoop file system class for that scheme

final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
try {
   fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig);
}
catch (IOException e) {
   throw new UnsupportedFileSystemSchemeException(
         "Hadoop File System abstraction does not support scheme '" + scheme + "'. " +
               "Either no file system implementation exists for that scheme, " +
               "or the relevant classes are missing from the classpath.", e);
}

// -- (3) instantiate the Hadoop file system

LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", scheme, fsClass.getName());

final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance();

// -- (4) create the proper URI to initialize the file system

final URI initUri;
if (fsUri.getAuthority() != null) {
   initUri = fsUri;
}
else {
   LOG.debug("URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)");

   String configEntry = hadoopConfig.get("fs.defaultFS", null);
   if (configEntry == null) {
      // fs.default.name deprecated as of hadoop 2.2.0 - see
      // http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
      configEntry = hadoopConfig.get("fs.default.name", null);
   }

   if (LOG.isDebugEnabled()) {
      LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
   }

   if (configEntry == null) {
      throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
            "Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS').");
   }
   else {
      try {
         initUri = URI.create(configEntry);
      }
      catch (IllegalArgumentException e) {
         throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
               "The configuration contains an invalid file system default name " +
               "('fs.default.name' or 'fs.defaultFS'): " + configEntry);
      }

      if (initUri.getAuthority() == null) {
         throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
               "Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') " +
               "contains no valid authority component (like hdfs namenode, S3 host, etc)");
      }
   }
}

// -- (5) configure the Hadoop file system

try {
   hadoopFs.initialize(initUri, hadoopConfig);
}
catch (UnknownHostException e) {
   String message = "The Hadoop file system's authority (" + initUri.getAuthority() +
         "), specified by either the file URI or the configuration, cannot be resolved.";

   throw new IOException(message, e);
}
{code}
with
{code:java}
final org.apache.hadoop.fs.FileSystem hadoopFs = org.apache.hadoop.fs.FileSystem.get(fsUri, hadoopConfig);
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)