Hadoop源码分析:HDFS读取文件

Hadoop源码分析:HDFS读取文件

上一篇博客http://blog.csdn.net/chengyuqiang/article/details/78636721分析了HDFS的DistributedFileSystem对象的创建过程。 然后就可以按照HDFS的API对HDFS中的文件和目录进行操作了,如列出某个目录中的文件和子目录、读取文件、写入文件等。

1.1 FileSystem.open()

与使用Java IO读取本地文件类似,读取HDFS文件其实就是创建一个文件输入流,在Hadoop中使用FileSystem.open()方法来创建输入流,open()方法是我们此次源码分析的入口。

    public static void readFile(String filePath) throws IOException{
        FileSystem fs = getFileSystem(filePath);
        InputStream in=null;
        try{
            in=fs.open(new Path(filePath));
            IOUtils.copyBytes(in, System.out,4096,false);
        }catch(Exception e){
            System.out.println(e.getMessage());
        }finally{
            IOUtils.closeStream(in);
        }
    }

1.2 进入open(Path f)方法

该方法返回的是一个FSDataInputStream对象。

  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file to open
   */
  public FSDataInputStream open(Path f) throws IOException {
    return open(f, getConf().getInt("io.file.buffer.size", 4096));
  }

1.3 进入 open(Path f, int bufferSize)方法

发现这是个抽象方法

  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file name to open
   * @param bufferSize the size of the buffer to be used.
   */
  public abstract FSDataInputStream open(Path f, int bufferSize)
    throws IOException;

下面进入DistributedFileSystem类的open(Path f, int bufferSize)方法

  @Override
  public FSDataInputStream open(Path f, final int bufferSize)
      throws IOException {
    //统计信息
    statistics.incrementReadOps(1);
    //相对路径转换为绝对路径
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataInputStream>() {
      @Override//核心方法
      public FSDataInputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        final DFSInputStream dfsis =
          dfs.open(getPathName(p), bufferSize, verifyChecksum);
        return dfs.createWrappedInputStream(dfsis);
      }
      @Override
      public FSDataInputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.open(p, bufferSize);
      }
    }.resolve(this, absF);
  }

在该方法中,statistics是一个org.apache.hadoop.fs.FileSystem.Statistics类型,它实现了文件系统读写过程中的一些统计,例如自从该HDFS对象建立以来,读了多少字节、写了多少字节等。在返回结果的时候,创建了一个FileSystemLinkResolver对象,并实现了此类的两个抽象方法, 最后调用了resolve()方法,其中doCall()方法和next()方法都在resolve()方法里用到了,只是next()方法只是在resolve()方法异常捕获时才调用。所以跟踪doCall()方法,doCall()方法里的open()方法有3个参数其中src表示要打开的文件路径,buffersize表示缓冲大小,verifyChecksum表示是否校验和。

1.4 进入dfs.open(String src, int buffersize, boolean verifyChecksum)方法

  /**
   * Create an input stream that obtains a nodelist from the
   * namenode, and then reads from all the right places.  Creates
   * inner subclass of InputStream that does the right out-of-band
   * work.
   */
  public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
      throws IOException, UnresolvedLinkException {
    checkOpen();
    //    Get block info from namenode
    TraceScope scope = getPathTraceScope("newDFSInputStream", src);
    try {
      return new DFSInputStream(this, src, verifyChecksum);
    } finally {
      scope.close();
    }
  }

checkOpen()方法表示检查文件系统是否已经打开,如果没有打开,则抛出异常–FileSystem closed。最后,在这个方法中调用了DFSClient.DFSInputStream()的构造方法,创建DFSInputStream输入流对象并返回(DFSInputStream是对客户端读取的输入流的抽象)。

1.5 进入该DFSInputStream构造方法

该方法先是做了一些准备工作,然后调用openInfo()方法,openInfo()方法是一个线程安全的方法,作用是从namenode获取要打开的文件的数据块信息。也就是说主要是为locatedBlocks对象赋值。

@InterfaceAudience.Private
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    DataEncryptionKeyFactory {
  ...

  DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
                 ) throws IOException, UnresolvedLinkException {
    this.dfsClient = dfsClient;
    this.verifyChecksum = verifyChecksum;
    this.src = src;
    synchronized (infoLock) {
      this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
    }
    openInfo();
  }
  ...
}

1.6 进入openInfo()方法

该方法中如果读取数据块信息失败,则会再次读取3次,主要调用了方法fetchLocatedBlocksAndGetLastBlockLength()方法来读取数据块的信息。该方法名字虽然长,但是说的很明白,即读取数据块信息并且获得最后一个数据块的长度。为什么偏偏要获取最后一个数据块的长度呢?因为之前的数据块大小固定嘛,如果是默认的,那就是128M,而最后一块大小就不一定了,有必要获取下。

 /**
   * Grab the open-file info from namenode
   */
  void openInfo() throws IOException, UnresolvedLinkException {
    synchronized(infoLock) {
      lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
      int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
      while (retriesForLastBlockLength > 0) {
        // Getting last block length as -1 is a special case. When cluster
        // restarts, DNs may not report immediately. At this time partial block
        // locations will not be available with NN for getting the length. Lets
        // retry for 3 times to get the length.
        if (lastBlockBeingWrittenLength == -1) {
          DFSClient.LOG.warn("Last block locations not available. "
              + "Datanodes might not have reported blocks completely."
              + " Will retry for " + retriesForLastBlockLength + " times");
          waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
          lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
        } else {
          break;
        }
        retriesForLastBlockLength--;
      }
      if (retriesForLastBlockLength == 0) {
        throw new IOException("Could not obtain the last block locations.");
      }
    }
  }

1.7 进入fetchLocatedBlocksAndGetLastBlockLength()方法

  private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
    //此处又出现了dfsClient,该对象负责与NameNode通信,由此可知getLocatedBlocks()方法实现了远程通信
    final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("newInfo = " + newInfo);
    }
    if (newInfo == null) {
      throw new IOException("Cannot open filename " + src);
    }

    if (locatedBlocks != null) {
      Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
      Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
      while (oldIter.hasNext() && newIter.hasNext()) {
        if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
          throw new IOException("Blocklist for " + src + " has changed!");
        }
      }
    }
    locatedBlocks = newInfo;
    long lastBlockBeingWrittenLength = 0;
    if (!locatedBlocks.isLastBlockComplete()) {
      final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
      if (last != null) {
        if (last.getLocations().length == 0) {
          if (last.getBlockSize() == 0) {
            // if the length is zero, then no data has been written to
            // datanode. So no need to wait for the locations.
            return 0;
          }
          return -1;
        }
        final long len = readBlockLength(last);
        last.getBlock().setNumBytes(len);
        lastBlockBeingWrittenLength = len; 
      }
    }

    fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();

    return lastBlockBeingWrittenLength;
  }

1.8 进入getLocatedBlocks(String src, long start)方法

  public LocatedBlocks getLocatedBlocks(String src, long start)
      throws IOException {
    return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
  }

进入getLocatedBlocks(String src, long start, long length)方法

  /*
   * This is just a wrapper around callGetBlockLocations, but non-static so that
   * we can stub it out for tests.
   */
  @VisibleForTesting
  public LocatedBlocks getLocatedBlocks(String src, long start, long length)
      throws IOException {
    TraceScope scope = getPathTraceScope("getBlockLocations", src);
    try {
      //此处终于见到了namenode 
      return callGetBlockLocations(namenode, src, start, length);
    } finally {
      scope.close();
    }
  }

1.9 进入callGetBlockLocations()方法

进入callGetBlockLocations(ClientProtocol namenode,String src, long start, long length)方法

  /**
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) 
      throws IOException {
    try {
      //调用namenode对象,进行远程调用
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                     FileNotFoundException.class,
                                     UnresolvedPathException.class);
    }
  }

callGetBlockLocations(ClientProtocol namenode,String src, long start, long length)方法涉及RPC的远程调用,后面再继续研读。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java帮帮-微信公众号-技术文章全总结

Spring-AOP

AOP引介 AOP(Aspect Oriented Programing)面向切面编程采用横向抽取机制,以取代传统的纵向继承体系的重复性代码(如性能监控/事务管...

3268
来自专栏MasiMaro 的技术博文

PE解析器的编写(三)——区块表的解析

PE文件中所有节的属性都被定义在节表中,节表由一系列的IMAGE_SECTION_HEADER结构排列而成,每个结构用来描述一个节,结构的排列顺序和它们描述的节...

842
来自专栏一枝花算不算浪漫

[Spring框架]Spring AOP基础入门总结二:Spring基于AspectJ的AOP的开发.

3108
来自专栏CSDN技术头条

Spring 框架和 Tomcat 容器扩展接口揭秘

在 Spring 框架中,每个应用程序上下文(ApplicationContext)管理着一个 BeanFactory,BeanFactory 主要负责 Bea...

754
来自专栏JackieZheng

Spring Boot系列——AOP配自定义注解的最佳实践

AOP(Aspect Oriented Programming),即面向切面编程,是Spring框架的大杀器之一。

592
来自专栏JAVA烂猪皮

Spring IoC中各个注解的理解和使用

一、把在Spring的xml文件中配置bean改为Spring的注解来配置bean

523
来自专栏xingoo, 一个梦想做发明家的程序员

【Spring实战】—— 2 构造注入

本文讲解了构造注入以及spring的基本使用方式,通过一个杂技演员的例子,讲述了依赖注入属性或者对象的使用方法。   如果想要使用spring来实现依赖注...

1948
来自专栏石奈子的Java之路

原 SpringBoot 2.0 系列00

1234
来自专栏青枫的专栏

day33_Spring学习笔记_01

4 + 1:4个核心jar包(beans、core、context、expression)+ 1个依赖jar包(com.springsource.org.apa...

401
来自专栏JackieZheng

RabbitMQ入门-Topic模式

上篇《RabbitMQ入门-Routing直连模式》我们介绍了可以定向发送消息,并可以根据自定义规则派发消息。看起来,这个Routing模式已经算灵活的了,但是...

22610

扫码关注云+社区