Hadoop源码分析:HDFS读取文件

Hadoop源码分析:HDFS读取文件

上一篇博客http://blog.csdn.net/chengyuqiang/article/details/78636721分析了HDFS的DistributedFileSystem对象的创建过程。 然后就可以按照HDFS的API对HDFS中的文件和目录进行操作了,如列出某个目录中的文件和子目录、读取文件、写入文件等。

1.1 FileSystem.open()

与使用Java IO读取本地文件类似,读取HDFS文件其实就是创建一个文件输入流,在Hadoop中使用FileSystem.open()方法来创建输入流,open()方法是我们此次源码分析的入口。

    public static void readFile(String filePath) throws IOException{
        FileSystem fs = getFileSystem(filePath);
        InputStream in=null;
        try{
            in=fs.open(new Path(filePath));
            IOUtils.copyBytes(in, System.out,4096,false);
        }catch(Exception e){
            System.out.println(e.getMessage());
        }finally{
            IOUtils.closeStream(in);
        }
    }

1.2 进入open(Path f)方法

该方法返回的是一个FSDataInputStream对象。

  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file to open
   */
  public FSDataInputStream open(Path f) throws IOException {
    return open(f, getConf().getInt("io.file.buffer.size", 4096));
  }

1.3 进入 open(Path f, int bufferSize)方法

发现这是个抽象方法

  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file name to open
   * @param bufferSize the size of the buffer to be used.
   */
  public abstract FSDataInputStream open(Path f, int bufferSize)
    throws IOException;

下面进入DistributedFileSystem类的open(Path f, int bufferSize)方法

  @Override
  public FSDataInputStream open(Path f, final int bufferSize)
      throws IOException {
    //统计信息
    statistics.incrementReadOps(1);
    //相对路径转换为绝对路径
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataInputStream>() {
      @Override//核心方法
      public FSDataInputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        final DFSInputStream dfsis =
          dfs.open(getPathName(p), bufferSize, verifyChecksum);
        return dfs.createWrappedInputStream(dfsis);
      }
      @Override
      public FSDataInputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.open(p, bufferSize);
      }
    }.resolve(this, absF);
  }

在该方法中,statistics是一个org.apache.hadoop.fs.FileSystem.Statistics类型,它实现了文件系统读写过程中的一些统计,例如自从该HDFS对象建立以来,读了多少字节、写了多少字节等。在返回结果的时候,创建了一个FileSystemLinkResolver对象,并实现了此类的两个抽象方法, 最后调用了resolve()方法,其中doCall()方法和next()方法都在resolve()方法里用到了,只是next()方法只是在resolve()方法异常捕获时才调用。所以跟踪doCall()方法,doCall()方法里的open()方法有3个参数其中src表示要打开的文件路径,buffersize表示缓冲大小,verifyChecksum表示是否校验和。

1.4 进入dfs.open(String src, int buffersize, boolean verifyChecksum)方法

  /**
   * Create an input stream that obtains a nodelist from the
   * namenode, and then reads from all the right places.  Creates
   * inner subclass of InputStream that does the right out-of-band
   * work.
   */
  public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
      throws IOException, UnresolvedLinkException {
    checkOpen();
    //    Get block info from namenode
    TraceScope scope = getPathTraceScope("newDFSInputStream", src);
    try {
      return new DFSInputStream(this, src, verifyChecksum);
    } finally {
      scope.close();
    }
  }

checkOpen()方法表示检查文件系统是否已经打开,如果没有打开,则抛出异常–FileSystem closed。最后,在这个方法中调用了DFSClient.DFSInputStream()的构造方法,创建DFSInputStream输入流对象并返回(DFSInputStream是对客户端读取的输入流的抽象)。

1.5 进入该DFSInputStream构造方法

该方法先是做了一些准备工作,然后调用openInfo()方法,openInfo()方法是一个线程安全的方法,作用是从namenode获取要打开的文件的数据块信息。也就是说主要是为locatedBlocks对象赋值。

@InterfaceAudience.Private
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
    DataEncryptionKeyFactory {
  ...

  DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
                 ) throws IOException, UnresolvedLinkException {
    this.dfsClient = dfsClient;
    this.verifyChecksum = verifyChecksum;
    this.src = src;
    synchronized (infoLock) {
      this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
    }
    openInfo();
  }
  ...
}

1.6 进入openInfo()方法

该方法中如果读取数据块信息失败,则会再次读取3次,主要调用了方法fetchLocatedBlocksAndGetLastBlockLength()方法来读取数据块的信息。该方法名字虽然长,但是说的很明白,即读取数据块信息并且获得最后一个数据块的长度。为什么偏偏要获取最后一个数据块的长度呢?因为之前的数据块大小固定嘛,如果是默认的,那就是128M,而最后一块大小就不一定了,有必要获取下。

 /**
   * Grab the open-file info from namenode
   */
  void openInfo() throws IOException, UnresolvedLinkException {
    synchronized(infoLock) {
      lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
      int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
      while (retriesForLastBlockLength > 0) {
        // Getting last block length as -1 is a special case. When cluster
        // restarts, DNs may not report immediately. At this time partial block
        // locations will not be available with NN for getting the length. Lets
        // retry for 3 times to get the length.
        if (lastBlockBeingWrittenLength == -1) {
          DFSClient.LOG.warn("Last block locations not available. "
              + "Datanodes might not have reported blocks completely."
              + " Will retry for " + retriesForLastBlockLength + " times");
          waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
          lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
        } else {
          break;
        }
        retriesForLastBlockLength--;
      }
      if (retriesForLastBlockLength == 0) {
        throw new IOException("Could not obtain the last block locations.");
      }
    }
  }

1.7 进入fetchLocatedBlocksAndGetLastBlockLength()方法

  private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
    //此处又出现了dfsClient,该对象负责与NameNode通信,由此可知getLocatedBlocks()方法实现了远程通信
    final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("newInfo = " + newInfo);
    }
    if (newInfo == null) {
      throw new IOException("Cannot open filename " + src);
    }

    if (locatedBlocks != null) {
      Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
      Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
      while (oldIter.hasNext() && newIter.hasNext()) {
        if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
          throw new IOException("Blocklist for " + src + " has changed!");
        }
      }
    }
    locatedBlocks = newInfo;
    long lastBlockBeingWrittenLength = 0;
    if (!locatedBlocks.isLastBlockComplete()) {
      final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
      if (last != null) {
        if (last.getLocations().length == 0) {
          if (last.getBlockSize() == 0) {
            // if the length is zero, then no data has been written to
            // datanode. So no need to wait for the locations.
            return 0;
          }
          return -1;
        }
        final long len = readBlockLength(last);
        last.getBlock().setNumBytes(len);
        lastBlockBeingWrittenLength = len; 
      }
    }

    fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();

    return lastBlockBeingWrittenLength;
  }

1.8 进入getLocatedBlocks(String src, long start)方法

  public LocatedBlocks getLocatedBlocks(String src, long start)
      throws IOException {
    return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
  }

进入getLocatedBlocks(String src, long start, long length)方法

  /*
   * This is just a wrapper around callGetBlockLocations, but non-static so that
   * we can stub it out for tests.
   */
  @VisibleForTesting
  public LocatedBlocks getLocatedBlocks(String src, long start, long length)
      throws IOException {
    TraceScope scope = getPathTraceScope("getBlockLocations", src);
    try {
      //此处终于见到了namenode 
      return callGetBlockLocations(namenode, src, start, length);
    } finally {
      scope.close();
    }
  }

1.9 进入callGetBlockLocations()方法

进入callGetBlockLocations(ClientProtocol namenode,String src, long start, long length)方法

  /**
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) 
      throws IOException {
    try {
      //调用namenode对象,进行远程调用
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                     FileNotFoundException.class,
                                     UnresolvedPathException.class);
    }
  }

callGetBlockLocations(ClientProtocol namenode,String src, long start, long length)方法涉及RPC的远程调用,后面再继续研读。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏古时的风筝

模拟实现Spring中的注解装配

在Spring中,XML文件中的bean配置是实现Spring IOC的核心配置文件,在早版本的Spring中,只能基于XML配置文件,配置各个对象之间的依赖关...

2115
来自专栏积累沉淀

初识HtmlParser

1、概念 网页解析,即程序自动分析网页内容、获取信息,从而进一步处理信息。 htmlparser包提供方便、简洁的处理html文件的方法,它将html页面中...

2045
来自专栏后端之路

Dubbo自定义异常message过长解决

参考问题Dubbo异常处理 由于dubbo会将自定义异常或者第三方异常包装直接放入RuntimeException,并且使用了 StringUtils.toSt...

44010
来自专栏算法修养

PAT 甲级 1021 Deepest Root (并查集,树的遍历)

1021. Deepest Root (25) 时间限制 1500 ms 内存限制 65536 kB 代码长度限制 16000 B ...

2947
来自专栏开发 & 算法杂谈

PAT Advanced 1005

Given a non-negative integer N, your task is to compute the sum of all the digi...

802
来自专栏码匠的流水账

聊聊storm tuple的序列化

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java

1182
来自专栏菩提树下的杨过

java:快速文件分割及合并

文件分割与合并是一个常见需求,比如:上传大文件时,可以先分割成小块,传到服务器后,再进行合并。很多高大上的分布式文件系统(比如:google的GFS、taoba...

3779
来自专栏pangguoming

JDBC上关于数据库中多表操作一对多关系和多对多关系的实现方法

我们知道,在设计一个Java bean的时候,要把这些BEAN 的数据存放在数据库中的表结构,然而这些数据库中的表直接又有些特殊的关系,例如员工与部门直接有一对...

6787
来自专栏一个会写诗的程序员的博客

DuplicateFileException: Duplicate files copied in APK META-INF/LICENSEDuplicateFileException: Duplic

912
来自专栏菩提树下的杨过

mybatis的物理分页:mybatis-paginator

文件分割与合并是一个常见需求,比如:上传大文件时,可以先分割成小块,传到服务器后,再进行合并。很多高大上的分布式文件系统(比如:google的GFS、taoba...

2019

扫码关注云+社区