1.1 Configurable 接口

package org.apache.hadoop.conf;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** Something that may be configured with a {@link Configuration}. */
public interface Configurable {

  /** Set the configuration to be used by this object. */
  void setConf(Configuration conf);

  /** Return the configuration used by this object. */
  Configuration getConf();

1.2 Configured类

package org.apache.hadoop.conf;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** Base class for things that may be configured with a {@link Configuration}. */
public class Configured implements Configurable {

  private Configuration conf;

  /** Construct a Configured. */
  public Configured() {

  /** Construct a Configured. */
  public Configured(Configuration conf) {

  // inherit javadoc
  public void setConf(Configuration conf) {
    this.conf = conf;

  // inherit javadoc
  public Configuration getConf() {
    return conf;


1.3 Configuration类

package org.apache.hadoop.conf;

import ...

public class Configuration implements Iterable<Map.Entry<String,String>>,
                                      Writable {

  private static class Resource {
    private final Object resource;
    private final String name;

    public Resource(Object resource) {
      this(resource, resource.toString());

    public Resource(Object resource, String name) {
      this.resource = resource;
      this.name = name;

    public String getName(){
      return name;

    public Object getResource() {
      return resource;

    public String toString() {
      return name;

   * Set the <code>value</code> of the <code>name</code> property. If 
   * <code>name</code> is deprecated or there is a deprecated name associated to it,
   * it sets the value to both names. Name will be trimmed before put into
   * configuration.
   * @param name property name.
   * @param value property value.
  public void set(String name, String value) {
    set(name, value, null);

   * Add a configuration resource. 
   * The properties of this resource will override properties of previously 
   * added resources, unless they were marked <a href="#Final">final</a>. 
   * @param name resource to be added, the classpath is examined for a file 
   *             with that name.
  public void addResource(String name) {
    addResourceObject(new Resource(name));



2.1 FileSystem

Hadoop有1个抽象的文件系统概念,HDFS只是其中一个实现。该抽象文件系统由抽象类org.apache.hadoop.fs.FileSystem 定义,该类继承了org.apache.hadoop.conf.Configured类,并实现了java.io.Closeable接口。 该抽象类类提供了丰富的方法用于对文件系统进行操作,比如创建目录、删除文件、重命名等。

package org.apache.hadoop.fs;

import ....

public abstract class FileSystem extends Configured implements Closeable {

  public static final String FS_DEFAULT_NAME_KEY = CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
  public static final String DEFAULT_FS = CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;

   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
  public boolean mkdirs(Path f) throws IOException {
    return mkdirs(f, FsPermission.getDirDefault());

  /** create a directory with the provided permission
   * The permission of the directory is set to be the provided permission as in
   * setPermission, not permission&~umask
   * @see #create(FileSystem, Path, FsPermission)
   * @param fs file system handle
   * @param dir the name of the directory to be created
   * @param permission the permission of the directory
   * @return true if the directory creation succeeds; false otherwise
   * @throws IOException
  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
  throws IOException {
    // create the directory using the default permission
    boolean result = fs.mkdirs(dir);
    // set its permission to be the supplied one
    fs.setPermission(dir, permission);
    return result;


2.2 FileSystem的子类DistributedFileSystem


package org.apache.hadoop.hdfs;
import ...
 * Implementation of the abstract FileSystem for the DFS system.
 * This object is the way end-user code interacts with a Hadoop
 * DistributedFileSystem.
@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
public class DistributedFileSystem extends FileSystem {
  private Path workingDir;
  private URI uri;
  //  "/user"
  private String homeDirPrefix = DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT;


2.3 FileSystem对象创建过程



public static void main(String[] args) throws Exception{
        String local="D:\\word2.txt";
        String dest="hdfs://";
        Configuration cfg=new Configuration();
        FileSystem fs=  FileSystem.get(URI.create(dest),cfg,"root");
        fs.copyFromLocalFile(new Path(local), new Path(dest));

(2)从该程序中的get()方法入手 下面进入FileSystem类的get(final URI uri, final Configuration conf, final String user)方法,发现调用get(URI uri, Configuration conf)方法

   * Get a filesystem instance based on the uri, the passed
   * configuration and the user
   * @param uri of the filesystem
   * @param conf the configuration to use
   * @param user to perform the get as
   * @return the filesystem instance
   * @throws IOException
   * @throws InterruptedException
  public static FileSystem get(final URI uri, final Configuration conf,
        final String user) throws IOException, InterruptedException {
    String ticketCachePath =
    UserGroupInformation ugi =
        UserGroupInformation.getBestUGI(ticketCachePath, user);
    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
      public FileSystem run() throws IOException {
        return get(uri, conf);

(3)进入get(URI uri, Configuration conf)方法 从下面的代码可以得知,get方法不是每次都创建FileSystem对象,会从缓存中获取FileSystem对象。

  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
   * of the URI determines a configuration property name,
   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
   * The entire URI is passed to the FileSystem instance's initialize method.
  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();
    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    return CACHE.get(uri, conf);

(4)进入CACHE.get(URI uri, Configuration conf)方法 发现CACHE为FileSystem的一个内部类。在该get()方法中,uri和conf被放在了一个key中,key中存储着用户身份信息和访问的系统信息。

  /** Caching FileSystem objects */
  static class Cache {
    private final ClientFinalizer clientFinalizer = new ClientFinalizer();

    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
    private final Set<Key> toAutoClose = new HashSet<Key>();

    /** A variable that makes all objects in the cache unique */
    private static AtomicLong unique = new AtomicLong(1);

    FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      return getInternal(uri, conf, key);
    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
      FileSystem fs;
      synchronized (this) {
        fs = map.get(key);
      if (fs != null) {
        return fs;
      fs = createFileSystem(uri, conf);
      synchronized (this) { // refetch the lock again
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system

        // now insert the new file system into the map
        if (map.isEmpty()
                && !ShutdownHookManager.get().isShutdownInProgress()) {
          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
        fs.key = key;
        map.put(key, fs);
        if (conf.getBoolean("fs.automatic.close", true)) {
        return fs;

(5)进入getInternal(URI uri, Configuration conf, Key key)方法 该方法内部类Cache中。

(6)进入createFileSystem(URI uri, Configuration conf)方法 该负责创建具体的文件系统实例

  private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf);
    return fs;

(7)进入initialize(URI uri, Configuration conf)方法 注意,由于此处是对DistributedFileSystem进行初始化,所以一定要查看DistributedFileSystem类的initialize(URI uri, Configuration conf)方法

  public void initialize(URI uri, Configuration conf) throws IOException {
    super.initialize(uri, conf);
    String host = uri.getHost();
    if (host == null) {
      throw new IOException("Incomplete HDFS URI, no host: "+ uri);
    homeDirPrefix = conf.get(
        DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,    //"dfs.user.home.dir.prefix"
        DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);// "/user"
    // 他的内部有一个RPC代理对象,负责远程获取NameNode上的信息。这是一个复杂的对象。
    this.dfs = new DFSClient(uri, conf, statistics);
    this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
    this.workingDir = getHomeDirectory();




0 条评论
登录 后参与评论



MyBatis之Hello world(Mybatis入门)

MyBatis中文网,超详细的:http://www.mybatis.org/mybatis-3/zh/index.html MyBatis英文网:http:/...




Spark学习之键值对(pair RDD)操作(3)

Spark学习之键值对(pair RDD)操作(3) 1. 我们通常从一个RDD中提取某些字段(如代表事件时间、用户ID或者其他标识符的字段),并使用这些字段为...




Cassandra Java 使用TimeUUIDType

参考地址 http://wiki.apache.org/cassandra/FAQ#working_with_timeuuid_in_java


Eureka Server过源码

Eureka Server启动 入口EurekaServerInitializerConfiguration @Configuration class Eur...


Spring Cloud Netflix

本文节选自《Netkiller Java 手札》 http://www.netkiller.cn 12.2. Spring Cloud Netflix 12....


spark出现GC overhead limit exceeded和java heap space

    spark执行任务时出现java.lang.OutOfMemoryError: GC overhead limit exceeded和java.lang...


聊聊spring cloud的DefaultEurekaServerContext

本文主要研究一下spring cloud的DefaultEurekaServerContext


Spring cloud 之 Feign Client

本文接选自 《Netkiller Java 手札》 12.2.2. Eureka Client Maven <project xmln...