前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hadoop源码分析:FileSystem类

Hadoop源码分析:FileSystem类

作者头像
程裕强
发布2018-01-02 16:51:26
1.4K0
发布2018-01-02 16:51:26
举报

1、org.apache.hadoop.conf包

org.apache.hadoop.conf包位于hadoop-common模块下

这里写图片描述
这里写图片描述

1.1 Configurable 接口

package org.apache.hadoop.conf;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** Something that may be configured with a {@link Configuration}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Configurable {

  /** Set the configuration to be used by this object. */
  void setConf(Configuration conf);

  /** Return the configuration used by this object. */
  Configuration getConf();
}

1.2 Configured类

package org.apache.hadoop.conf;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** Base class for things that may be configured with a {@link Configuration}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Configured implements Configurable {

  private Configuration conf;

  /** Construct a Configured. */
  public Configured() {
    this(null);
  }

  /** Construct a Configured. */
  public Configured(Configuration conf) {
    setConf(conf);
  }

  // inherit javadoc
  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }

  // inherit javadoc
  @Override
  public Configuration getConf() {
    return conf;
  }

}

1.3 Configuration类

package org.apache.hadoop.conf;

import ...

@InterfaceAudience.Public
@InterfaceStability.Stable
public class Configuration implements Iterable<Map.Entry<String,String>>,
                                      Writable {
  ...

  private static class Resource {
    private final Object resource;
    private final String name;

    public Resource(Object resource) {
      this(resource, resource.toString());
    }

    public Resource(Object resource, String name) {
      this.resource = resource;
      this.name = name;
    }

    public String getName(){
      return name;
    }

    public Object getResource() {
      return resource;
    }

    @Override
    public String toString() {
      return name;
    }
  }

  /** 
   * Set the <code>value</code> of the <code>name</code> property. If 
   * <code>name</code> is deprecated or there is a deprecated name associated to it,
   * it sets the value to both names. Name will be trimmed before put into
   * configuration.
   * 
   * @param name property name.
   * @param value property value.
   */
  public void set(String name, String value) {
    set(name, value, null);
  }

  /**
   * Add a configuration resource. 
   * 
   * The properties of this resource will override properties of previously 
   * added resources, unless they were marked <a href="#Final">final</a>. 
   * 
   * @param name resource to be added, the classpath is examined for a file 
   *             with that name.
   */
  public void addResource(String name) {
    addResourceObject(new Resource(name));
  }

2、org.apache.hadoop.fs包

org.apache.hadoop.fs包位于hadoop-common模块下

2.1 FileSystem

Hadoop有1个抽象的文件系统概念,HDFS只是其中一个实现。该抽象文件系统由抽象类org.apache.hadoop.fs.FileSystem 定义,该类继承了org.apache.hadoop.conf.Configured类,并实现了java.io.Closeable接口。 该抽象类类提供了丰富的方法用于对文件系统进行操作,比如创建目录、删除文件、重命名等。

package org.apache.hadoop.fs;

import ....

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured implements Closeable {

  //"fs.defaultFS"
  public static final String FS_DEFAULT_NAME_KEY = CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
  //"file:///";
  public static final String DEFAULT_FS = CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;


  /**
   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
   */
  public boolean mkdirs(Path f) throws IOException {
    return mkdirs(f, FsPermission.getDirDefault());
  }

  /** create a directory with the provided permission
   * The permission of the directory is set to be the provided permission as in
   * setPermission, not permission&~umask
   * 
   * @see #create(FileSystem, Path, FsPermission)
   * 
   * @param fs file system handle
   * @param dir the name of the directory to be created
   * @param permission the permission of the directory
   * @return true if the directory creation succeeds; false otherwise
   * @throws IOException
   */
  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
  throws IOException {
    // create the directory using the default permission
    boolean result = fs.mkdirs(dir);
    // set its permission to be the supplied one
    fs.setPermission(dir, permission);
    return result;
  }

}

2.2 FileSystem的子类DistributedFileSystem

FileSystem抽象类的一个针对于分布式文件系统的实现子类,该类实现了DFS系统,通过该类用户代码与HDFS交互。

package org.apache.hadoop.hdfs;
import ...
/****************************************************************
 * Implementation of the abstract FileSystem for the DFS system.
 * This object is the way end-user code interacts with a Hadoop
 * DistributedFileSystem.
 *
 *****************************************************************/
@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
@InterfaceStability.Unstable
public class DistributedFileSystem extends FileSystem {
  private Path workingDir;
  private URI uri;
  //  "/user"
  private String homeDirPrefix = DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT;

  ...
}

2.3 FileSystem对象创建过程

Hadoop支持多钟文件系统,那么Hadoop是如何通过FileSystem类引用实际的DistributedFileSystem文件系统的呢,下面我们将通过源码逐步分析这个创建过程。

(1)创建FileSystem的小程序

public static void main(String[] args) throws Exception{
        //本地文件路径
        String local="D:\\word2.txt";
        String dest="hdfs://192.168.80.131:9000/user/root/input/word2.txt";
        Configuration cfg=new Configuration();
        FileSystem fs=  FileSystem.get(URI.create(dest),cfg,"root");
        fs.copyFromLocalFile(new Path(local), new Path(dest));
        fs.close();
    } 

(2)从该程序中的get()方法入手 下面进入FileSystem类的get(final URI uri, final Configuration conf, final String user)方法,发现调用get(URI uri, Configuration conf)方法

  /**
   * Get a filesystem instance based on the uri, the passed
   * configuration and the user
   * @param uri of the filesystem
   * @param conf the configuration to use
   * @param user to perform the get as
   * @return the filesystem instance
   * @throws IOException
   * @throws InterruptedException
   */
  public static FileSystem get(final URI uri, final Configuration conf,
        final String user) throws IOException, InterruptedException {
    String ticketCachePath =
      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
    UserGroupInformation ugi =
        UserGroupInformation.getBestUGI(ticketCachePath, user);
    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
      @Override
      public FileSystem run() throws IOException {
        return get(uri, conf);
      }
    });
  }

(3)进入get(URI uri, Configuration conf)方法 从下面的代码可以得知,get方法不是每次都创建FileSystem对象,会从缓存中获取FileSystem对象。

  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
   * of the URI determines a configuration property name,
   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
   * The entire URI is passed to the FileSystem instance's initialize method.
   */
  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();
    //根据fs.defaultFS的值获取文件系统,若未设置该参数则根据file:///返回文件件系统
    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    }
    //根据fs.defaultFS的值获取URI,若未设置则使用file:///创建URI
    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }
    //disableCacheName是conf中关于禁用缓存的配置,若该项配置false,则表示使用缓存,进入createFileSystem()
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    }
    //本配置文件conf中并未配置关于缓存的信息,所以进入CACHE.get()方法
    return CACHE.get(uri, conf);
  }

(4)进入CACHE.get(URI uri, Configuration conf)方法 发现CACHE为FileSystem的一个内部类。在该get()方法中,uri和conf被放在了一个key中,key中存储着用户身份信息和访问的系统信息。

  /** Caching FileSystem objects */
  static class Cache {
    private final ClientFinalizer clientFinalizer = new ClientFinalizer();

    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
    private final Set<Key> toAutoClose = new HashSet<Key>();

    /** A variable that makes all objects in the cache unique */
    private static AtomicLong unique = new AtomicLong(1);

    FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      return getInternal(uri, conf, key);
    }
    ...
    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
      FileSystem fs;
      //此处相当于缓存机制,当用户第一次进入该方法,map空;该用户再次进入该方法,并访问同一个uri
      //则文件系统直接从map中获取,免去再次初始化的过程
      synchronized (this) {
        fs = map.get(key);
      }
      if (fs != null) {
        return fs;
      }
      //创建文件系统的核心代码
      fs = createFileSystem(uri, conf);
      synchronized (this) { // refetch the lock again
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }

        // now insert the new file system into the map
        if (map.isEmpty()
                && !ShutdownHookManager.get().isShutdownInProgress()) {
          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
        }
        fs.key = key;
        //用户第一次进来,map空,在此处为map赋值,上个方法中的key与相应的文件系统作为键值对存入map
        map.put(key, fs);
        if (conf.getBoolean("fs.automatic.close", true)) {
          toAutoClose.add(key);
        }
        return fs;
      }
    }
    ...
  }

(5)进入getInternal(URI uri, Configuration conf, Key key)方法 该方法内部类Cache中。

(6)进入createFileSystem(URI uri, Configuration conf)方法 该负责创建具体的文件系统实例

  private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    //根据conf中配置的fs.defaultFS的值来获取相应的文件系统对象的class文件,即DFS的字节码文件
    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
    //通过反射机制,利用上面的class文件,创建相应的文件系统对象
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    //上面的fs对象仅是一个空系统,需要调用initialize()进行初始化
    fs.initialize(uri, conf);
    return fs;
  }

(7)进入initialize(URI uri, Configuration conf)方法 注意,由于此处是对DistributedFileSystem进行初始化,所以一定要查看DistributedFileSystem类的initialize(URI uri, Configuration conf)方法

  @Override
  public void initialize(URI uri, Configuration conf) throws IOException {
    super.initialize(uri, conf);
    setConf(conf);
    //获取NameNode主机名
    String host = uri.getHost();
    if (host == null) {
      throw new IOException("Incomplete HDFS URI, no host: "+ uri);
    }
    homeDirPrefix = conf.get(
        DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,    //"dfs.user.home.dir.prefix"
        DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);// "/user"
    //对dfs、url和workingDir进行初始化
    //最重要的是dfs,这是一个DFSClint,从名字可以看出是一个客户端,负责与NameNode通信,
    // 他的内部有一个RPC代理对象,负责远程获取NameNode上的信息。这是一个复杂的对象。
    this.dfs = new DFSClient(uri, conf, statistics);
    this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
    this.workingDir = getHomeDirectory();
  }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-11-26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、org.apache.hadoop.conf包
    • 1.1 Configurable 接口
      • 1.2 Configured类
        • 1.3 Configuration类
        • 2、org.apache.hadoop.fs包
          • 2.1 FileSystem
            • 2.2 FileSystem的子类DistributedFileSystem
              • 2.3 FileSystem对象创建过程
              相关产品与服务
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档