Hadoop源码分析:FileSystem类

1、org.apache.hadoop.conf包

org.apache.hadoop.conf包位于hadoop-common模块下

1.1 Configurable 接口

package org.apache.hadoop.conf;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** Something that may be configured with a {@link Configuration}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Configurable {

  /** Set the configuration to be used by this object. */
  void setConf(Configuration conf);

  /** Return the configuration used by this object. */
  Configuration getConf();
}

1.2 Configured类

package org.apache.hadoop.conf;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** Base class for things that may be configured with a {@link Configuration}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Configured implements Configurable {

  private Configuration conf;

  /** Construct a Configured. */
  public Configured() {
    this(null);
  }

  /** Construct a Configured. */
  public Configured(Configuration conf) {
    setConf(conf);
  }

  // inherit javadoc
  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }

  // inherit javadoc
  @Override
  public Configuration getConf() {
    return conf;
  }

}

1.3 Configuration类

package org.apache.hadoop.conf;

import ...

@InterfaceAudience.Public
@InterfaceStability.Stable
public class Configuration implements Iterable<Map.Entry<String,String>>,
                                      Writable {
  ...

  private static class Resource {
    private final Object resource;
    private final String name;

    public Resource(Object resource) {
      this(resource, resource.toString());
    }

    public Resource(Object resource, String name) {
      this.resource = resource;
      this.name = name;
    }

    public String getName(){
      return name;
    }

    public Object getResource() {
      return resource;
    }

    @Override
    public String toString() {
      return name;
    }
  }

  /** 
   * Set the <code>value</code> of the <code>name</code> property. If 
   * <code>name</code> is deprecated or there is a deprecated name associated to it,
   * it sets the value to both names. Name will be trimmed before put into
   * configuration.
   * 
   * @param name property name.
   * @param value property value.
   */
  public void set(String name, String value) {
    set(name, value, null);
  }

  /**
   * Add a configuration resource. 
   * 
   * The properties of this resource will override properties of previously 
   * added resources, unless they were marked <a href="#Final">final</a>. 
   * 
   * @param name resource to be added, the classpath is examined for a file 
   *             with that name.
   */
  public void addResource(String name) {
    addResourceObject(new Resource(name));
  }

2、org.apache.hadoop.fs包

org.apache.hadoop.fs包位于hadoop-common模块下

2.1 FileSystem

Hadoop有1个抽象的文件系统概念,HDFS只是其中一个实现。该抽象文件系统由抽象类org.apache.hadoop.fs.FileSystem 定义,该类继承了org.apache.hadoop.conf.Configured类,并实现了java.io.Closeable接口。 该抽象类类提供了丰富的方法用于对文件系统进行操作,比如创建目录、删除文件、重命名等。

package org.apache.hadoop.fs;

import ....

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured implements Closeable {

  //"fs.defaultFS"
  public static final String FS_DEFAULT_NAME_KEY = CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
  //"file:///";
  public static final String DEFAULT_FS = CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;


  /**
   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
   */
  public boolean mkdirs(Path f) throws IOException {
    return mkdirs(f, FsPermission.getDirDefault());
  }

  /** create a directory with the provided permission
   * The permission of the directory is set to be the provided permission as in
   * setPermission, not permission&~umask
   * 
   * @see #create(FileSystem, Path, FsPermission)
   * 
   * @param fs file system handle
   * @param dir the name of the directory to be created
   * @param permission the permission of the directory
   * @return true if the directory creation succeeds; false otherwise
   * @throws IOException
   */
  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
  throws IOException {
    // create the directory using the default permission
    boolean result = fs.mkdirs(dir);
    // set its permission to be the supplied one
    fs.setPermission(dir, permission);
    return result;
  }

}

2.2 FileSystem的子类DistributedFileSystem

FileSystem抽象类的一个针对于分布式文件系统的实现子类,该类实现了DFS系统,通过该类用户代码与HDFS交互。

package org.apache.hadoop.hdfs;
import ...
/****************************************************************
 * Implementation of the abstract FileSystem for the DFS system.
 * This object is the way end-user code interacts with a Hadoop
 * DistributedFileSystem.
 *
 *****************************************************************/
@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
@InterfaceStability.Unstable
public class DistributedFileSystem extends FileSystem {
  private Path workingDir;
  private URI uri;
  //  "/user"
  private String homeDirPrefix = DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT;

  ...
}

2.3 FileSystem对象创建过程

Hadoop支持多钟文件系统,那么Hadoop是如何通过FileSystem类引用实际的DistributedFileSystem文件系统的呢,下面我们将通过源码逐步分析这个创建过程。

(1)创建FileSystem的小程序

public static void main(String[] args) throws Exception{
        //本地文件路径
        String local="D:\\word2.txt";
        String dest="hdfs://192.168.80.131:9000/user/root/input/word2.txt";
        Configuration cfg=new Configuration();
        FileSystem fs=  FileSystem.get(URI.create(dest),cfg,"root");
        fs.copyFromLocalFile(new Path(local), new Path(dest));
        fs.close();
    } 

(2)从该程序中的get()方法入手 下面进入FileSystem类的get(final URI uri, final Configuration conf, final String user)方法,发现调用get(URI uri, Configuration conf)方法

  /**
   * Get a filesystem instance based on the uri, the passed
   * configuration and the user
   * @param uri of the filesystem
   * @param conf the configuration to use
   * @param user to perform the get as
   * @return the filesystem instance
   * @throws IOException
   * @throws InterruptedException
   */
  public static FileSystem get(final URI uri, final Configuration conf,
        final String user) throws IOException, InterruptedException {
    String ticketCachePath =
      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
    UserGroupInformation ugi =
        UserGroupInformation.getBestUGI(ticketCachePath, user);
    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
      @Override
      public FileSystem run() throws IOException {
        return get(uri, conf);
      }
    });
  }

(3)进入get(URI uri, Configuration conf)方法 从下面的代码可以得知,get方法不是每次都创建FileSystem对象,会从缓存中获取FileSystem对象。

  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
   * of the URI determines a configuration property name,
   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
   * The entire URI is passed to the FileSystem instance's initialize method.
   */
  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();
    //根据fs.defaultFS的值获取文件系统,若未设置该参数则根据file:///返回文件件系统
    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    }
    //根据fs.defaultFS的值获取URI,若未设置则使用file:///创建URI
    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }
    //disableCacheName是conf中关于禁用缓存的配置,若该项配置false,则表示使用缓存,进入createFileSystem()
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    }
    //本配置文件conf中并未配置关于缓存的信息,所以进入CACHE.get()方法
    return CACHE.get(uri, conf);
  }

(4)进入CACHE.get(URI uri, Configuration conf)方法 发现CACHE为FileSystem的一个内部类。在该get()方法中,uri和conf被放在了一个key中,key中存储着用户身份信息和访问的系统信息。

  /** Caching FileSystem objects */
  static class Cache {
    private final ClientFinalizer clientFinalizer = new ClientFinalizer();

    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
    private final Set<Key> toAutoClose = new HashSet<Key>();

    /** A variable that makes all objects in the cache unique */
    private static AtomicLong unique = new AtomicLong(1);

    FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      return getInternal(uri, conf, key);
    }
    ...
    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
      FileSystem fs;
      //此处相当于缓存机制,当用户第一次进入该方法,map空;该用户再次进入该方法,并访问同一个uri
      //则文件系统直接从map中获取,免去再次初始化的过程
      synchronized (this) {
        fs = map.get(key);
      }
      if (fs != null) {
        return fs;
      }
      //创建文件系统的核心代码
      fs = createFileSystem(uri, conf);
      synchronized (this) { // refetch the lock again
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }

        // now insert the new file system into the map
        if (map.isEmpty()
                && !ShutdownHookManager.get().isShutdownInProgress()) {
          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
        }
        fs.key = key;
        //用户第一次进来,map空,在此处为map赋值,上个方法中的key与相应的文件系统作为键值对存入map
        map.put(key, fs);
        if (conf.getBoolean("fs.automatic.close", true)) {
          toAutoClose.add(key);
        }
        return fs;
      }
    }
    ...
  }

(5)进入getInternal(URI uri, Configuration conf, Key key)方法 该方法内部类Cache中。

(6)进入createFileSystem(URI uri, Configuration conf)方法 该负责创建具体的文件系统实例

  private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    //根据conf中配置的fs.defaultFS的值来获取相应的文件系统对象的class文件,即DFS的字节码文件
    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
    //通过反射机制,利用上面的class文件,创建相应的文件系统对象
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    //上面的fs对象仅是一个空系统,需要调用initialize()进行初始化
    fs.initialize(uri, conf);
    return fs;
  }

(7)进入initialize(URI uri, Configuration conf)方法 注意,由于此处是对DistributedFileSystem进行初始化,所以一定要查看DistributedFileSystem类的initialize(URI uri, Configuration conf)方法

  @Override
  public void initialize(URI uri, Configuration conf) throws IOException {
    super.initialize(uri, conf);
    setConf(conf);
    //获取NameNode主机名
    String host = uri.getHost();
    if (host == null) {
      throw new IOException("Incomplete HDFS URI, no host: "+ uri);
    }
    homeDirPrefix = conf.get(
        DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,    //"dfs.user.home.dir.prefix"
        DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);// "/user"
    //对dfs、url和workingDir进行初始化
    //最重要的是dfs,这是一个DFSClint,从名字可以看出是一个客户端,负责与NameNode通信,
    // 他的内部有一个RPC代理对象,负责远程获取NameNode上的信息。这是一个复杂的对象。
    this.dfs = new DFSClient(uri, conf, statistics);
    this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
    this.workingDir = getHomeDirectory();
  }

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏别先生

MyBatis之Hello world(Mybatis入门)

MyBatis中文网,超详细的:http://www.mybatis.org/mybatis-3/zh/index.html MyBatis英文网:http:/...

2325
来自专栏java初学

关于mybatis的思考(3)——ResultMaps的使用

1548
来自专栏王小雷

Spark学习之键值对(pair RDD)操作(3)

Spark学习之键值对(pair RDD)操作(3) 1. 我们通常从一个RDD中提取某些字段(如代表事件时间、用户ID或者其他标识符的字段),并使用这些字段为...

19810
来自专栏一枝花算不算浪漫

MyBatis学习总结(二)——使用MyBatis对表执行CRUD操作

34210
来自专栏

Cassandra Java 使用TimeUUIDType

参考地址 http://wiki.apache.org/cassandra/FAQ#working_with_timeuuid_in_java

1303
来自专栏叔叔的博客

Eureka Server过源码

Eureka Server启动 入口EurekaServerInitializerConfiguration @Configuration class Eur...

3064
来自专栏Netkiller

Spring Cloud Netflix

本文节选自《Netkiller Java 手札》 http://www.netkiller.cn 12.2. Spring Cloud Netflix 12....

2767
来自专栏闵开慧

spark出现GC overhead limit exceeded和java heap space

    spark执行任务时出现java.lang.OutOfMemoryError: GC overhead limit exceeded和java.lang...

3679
来自专栏码匠的流水账

聊聊spring cloud的DefaultEurekaServerContext

本文主要研究一下spring cloud的DefaultEurekaServerContext

721
来自专栏Netkiller

Spring cloud 之 Feign Client

本文接选自 《Netkiller Java 手札》 12.2.2. Eureka Client 12.2.2.1. Maven <project xmln...

4115

扫码关注云+社区