Hadoop源码分析:HDFS读取文件

Hadoop源码分析:HDFS读取文件

上一篇博客http://blog.csdn.net/chengyuqiang/article/details/78636721分析了HDFS的DistributedFileSystem对象的创建过程。 然后就可以按照HDFS的API对HDFS中的文件和目录进行操作了,如列出某个目录中的文件和子目录、读取文件、写入文件等。

1.1 FileSystem.open()

与使用Java IO读取本地文件类似,读取HDFS文件其实就是创建一个文件输入流,在Hadoop中使用FileSystem.open()方法来创建输入流,open()方法是我们此次源码分析的入口。

    public static void readFile(String filePath) throws IOException{
        FileSystem fs = getFileSystem(filePath);
        InputStream in=null;
        try{
            in=fs.open(new Path(filePath));
            IOUtils.copyBytes(in, System.out,4096,false);
        }catch(Exception e){
            System.out.println(e.getMessage());
        }finally{
            IOUtils.closeStream(in);
        }
    }

1.2 进入open(Path f)方法

该方法返回的是一个FSDataInputStream对象。

  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file to open
   */
  public FSDataInputStream open(Path f) throws IOException {
    return open(f, getConf().getInt("io.file.buffer.size", 4096));
  }

1.3 进入 open(Path f, int bufferSize)方法

发现这是个抽象方法

  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file name to open
   * @param bufferSize the size of the buffer to be used.
   */
  public abstract FSDataInputStream open(Path f, int bufferSize)
    throws IOException;

下面进入DistributedFileSystem类的open(Path f, int bufferSize)方法

  @Override
  public FSDataInputStream open(Path f, final int bufferSize)
      throws IOException {
    //统计信息
    statistics.incrementReadOps(1);
    //相对路径转换为绝对路径
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataInputStream>() {
      @Override//核心方法
      public FSDataInputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        final DFSInputStream dfsis =
          dfs.open(getPathName(p), bufferSize, verifyChecksum);
        return dfs.createWrappedInputStream(dfsis);
      }
      @Override
      public FSDataInputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.open(p, bufferSize);
      }
    }.resolve(this, absF);
  }

在该方法中,statistics是一个org.apache.hadoop.fs.FileSystem.Statistics类型,它实现了文件系统读写过程中的一些统计,例如自从该HDFS对象建立以来,读了多少字节、写了多少字节等。在返回结果的时候,创建了一个FileSystemLinkResolver对象,并实现了此类的两个抽象方法, 最后调用了resolve()方法,其中doCall()方法和next()方法都在resolve()方法里用到了,只是next()方法只是在resolve()方法异常捕获时才调用。所以跟踪doCall()方法,doCall()方法里的open()方法有3个参数其中src表示要打开的文件路径,buffersize表示缓冲大小,verifyChecksum表示是否校验和。

1.4 进入dfs.open(String src, int buffersize, boolean verifyChecksum)方法

  /**
   * Create an input stream that obtains a nodelist from the
   * namenode, and then reads from all the right places.  Creates
   * inner subclass of InputStream that does the right out-of-band
   * work.
   */
  public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
      throws IOException, UnresolvedLinkException {
    checkOpen();
    //    Get block info from namenode
    TraceScope scope = getPathTraceScope("newDFSInputStream", src);
    try {
      return new DFSInputStream(this, src, verifyChecksum);
    } finally {
      scope.close();
    }
  }

checkOpen()方法表示检查文件系统是否已经打开,如果没有打开,则抛出异常–FileSystem closed。最后,在这个方法中调用了DFSClient.DFSInputStream()的构造方法,创建DFSInputStream输入流对象并返回(DFSInputStream是对客户端读取的输入流的抽象)。

1.5 进入该DFSInputStream构造方法

该方法先是做了一些准备工作,然后调用openInfo()方法,openInfo()方法是一个线程安全的方法,作用是从namenode获取要打开的文件的数据块信息。也就是说主要是为locatedBlocks对象赋值。

@InterfaceAudience.Private
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    DataEncryptionKeyFactory {
  ...

  DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
                 ) throws IOException, UnresolvedLinkException {
    this.dfsClient = dfsClient;
    this.verifyChecksum = verifyChecksum;
    this.src = src;
    synchronized (infoLock) {
      this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
    }
    openInfo();
  }
  ...
}

1.6 进入openInfo()方法

该方法中如果读取数据块信息失败,则会再次读取3次,主要调用了方法fetchLocatedBlocksAndGetLastBlockLength()方法来读取数据块的信息。该方法名字虽然长,但是说的很明白,即读取数据块信息并且获得最后一个数据块的长度。为什么偏偏要获取最后一个数据块的长度呢?因为之前的数据块大小固定嘛,如果是默认的,那就是128M,而最后一块大小就不一定了,有必要获取下。

 /**
   * Grab the open-file info from namenode
   */
  void openInfo() throws IOException, UnresolvedLinkException {
    synchronized(infoLock) {
      lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
      int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
      while (retriesForLastBlockLength > 0) {
        // Getting last block length as -1 is a special case. When cluster
        // restarts, DNs may not report immediately. At this time partial block
        // locations will not be available with NN for getting the length. Lets
        // retry for 3 times to get the length.
        if (lastBlockBeingWrittenLength == -1) {
          DFSClient.LOG.warn("Last block locations not available. "
              + "Datanodes might not have reported blocks completely."
              + " Will retry for " + retriesForLastBlockLength + " times");
          waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
          lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
        } else {
          break;
        }
        retriesForLastBlockLength--;
      }
      if (retriesForLastBlockLength == 0) {
        throw new IOException("Could not obtain the last block locations.");
      }
    }
  }

1.7 进入fetchLocatedBlocksAndGetLastBlockLength()方法

  private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
    //此处又出现了dfsClient,该对象负责与NameNode通信,由此可知getLocatedBlocks()方法实现了远程通信
    final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("newInfo = " + newInfo);
    }
    if (newInfo == null) {
      throw new IOException("Cannot open filename " + src);
    }

    if (locatedBlocks != null) {
      Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
      Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
      while (oldIter.hasNext() && newIter.hasNext()) {
        if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
          throw new IOException("Blocklist for " + src + " has changed!");
        }
      }
    }
    locatedBlocks = newInfo;
    long lastBlockBeingWrittenLength = 0;
    if (!locatedBlocks.isLastBlockComplete()) {
      final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
      if (last != null) {
        if (last.getLocations().length == 0) {
          if (last.getBlockSize() == 0) {
            // if the length is zero, then no data has been written to
            // datanode. So no need to wait for the locations.
            return 0;
          }
          return -1;
        }
        final long len = readBlockLength(last);
        last.getBlock().setNumBytes(len);
        lastBlockBeingWrittenLength = len; 
      }
    }

    fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();

    return lastBlockBeingWrittenLength;
  }

1.8 进入getLocatedBlocks(String src, long start)方法

  public LocatedBlocks getLocatedBlocks(String src, long start)
      throws IOException {
    return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
  }

进入getLocatedBlocks(String src, long start, long length)方法

  /*
   * This is just a wrapper around callGetBlockLocations, but non-static so that
   * we can stub it out for tests.
   */
  @VisibleForTesting
  public LocatedBlocks getLocatedBlocks(String src, long start, long length)
      throws IOException {
    TraceScope scope = getPathTraceScope("getBlockLocations", src);
    try {
      //此处终于见到了namenode 
      return callGetBlockLocations(namenode, src, start, length);
    } finally {
      scope.close();
    }
  }

1.9 进入callGetBlockLocations()方法

进入callGetBlockLocations(ClientProtocol namenode,String src, long start, long length)方法

  /**
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) 
      throws IOException {
    try {
      //调用namenode对象,进行远程调用
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                     FileNotFoundException.class,
                                     UnresolvedPathException.class);
    }
  }

callGetBlockLocations(ClientProtocol namenode,String src, long start, long length)方法涉及RPC的远程调用,后面再继续研读。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏coolblog.xyz技术专栏

Spring IOC 容器源码分析 - 创建单例 bean 的过程

在上一篇文章中,我比较详细的分析了获取 bean 的方法,也就是getBean(String)的实现逻辑。对于已实例化好的单例 bean,getBean(Str...

2067
来自专栏古时的风筝

模拟实现Spring中的注解装配

在Spring中,XML文件中的bean配置是实现Spring IOC的核心配置文件,在早版本的Spring中,只能基于XML配置文件,配置各个对象之间的依赖关...

1915
来自专栏Java Edge

IoC容器初始化过程(下)1 BeanDefinition的载入和解析 2 BeanDefinition在IoC容器中的注册

3508
来自专栏Java Edge

IoC容器的初始化过程(上)1 BeanDefinition的Resource定位

2877
来自专栏Java编程技术

常用开源框架中设计模式使用分析-工厂模式(Factory Pattern)

工厂模式是创建型模式,他封装了对象的创建过程,调用者使用具体的工厂方法根据参数就可以获取对应的对象。

472
来自专栏电光石火

@Autowired和@Resource的区别

用途:做bean的注入时使用 历史:@Autowired        属于Spring的注解                org.springfr...

3174
来自专栏xingoo, 一个梦想做发明家的程序员

基于Dubbo的http自动测试工具分享

公司是采用微服务来做模块化的,各个模块之间采用dubbo通信。好处就不用提了,省略了之前模块间复杂的http访问。不过也遇到一些问题: PS: Githu...

2288
来自专栏coolblog.xyz技术专栏

Spring IOC 容器源码分析 - 循环依赖的解决办法

本文,我们来看一下 Spring 是如何解决循环依赖问题的。在本篇文章中,我会首先向大家介绍一下什么是循环依赖。然后,进入源码分析阶段。为了更好的说明 Spri...

1675
来自专栏chenssy

【死磕 Spring】----- IOC 之 IOC 初始化总结

前面 13 篇博文从源码层次分析了 IOC 整个初始化过程,这篇就这些内容做一个总结将其连贯起来。

181
来自专栏Java编程技术

论Spring中循环依赖的正确性与Bean注入的顺序关系

最近在做项目时候遇到一个奇葩问题,就是bean依赖注入的正确性与bean直接注入的顺序有关系,但是正常情况下明明是和顺序没关系的啊,究竟啥情况那,不急,让我一一...

842

扫码关注云+社区