Hadoop源码篇--Client源码

一。前述

今天起剖析源码,先从Client看起,因为Client在MapReduce的过程中承担了很多重要的角色。

二。MapReduce框架主类

代码如下:

public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration(true);
        //job  作业
        Job  job = Job.getInstance(conf);
        
         // Create a new Job
//         Job job = Job.getInstance();
         job.setJarByClass(MyWC.class);
         
         // Specify various job-specific parameters     
         job.setJobName("myjob");
         
//         job.setInputPath(new Path("in"));
//         job.setOutputPath(new Path("out"));
         
         Path input = new Path("/user/root");
        FileInputFormat.addInputPath(job, input );
         
         Path output = new Path("/output/wordcount");
         if(output.getFileSystem(conf).exists(output)){
             output.getFileSystem(conf).delete(output,true);
         }
        FileOutputFormat.setOutputPath(job, output );
         
         
         
         
         job.setMapperClass(MyMapper.class);
         job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(IntWritable.class);
         job.setReducerClass(MyReducer.class);

         // Submit the job, then poll for progress until the job is complete
         job.waitForCompletion(true);

第一步,先分析Job,可以看见源码中Job实现了public class Job extends JobContextImpl implements JobContext

然后JobContext实现了 MRJobConfig,可以看见其中有很多配置

因为job中传的参数为conf,所以这里的配置即对应我们的配置文件中的属性值。

  Job  job = Job.getInstance(conf);

 挑几个重要的看下:

public static final int DEFAULT_MAP_MEMORY_MB = 1024;//默认的Mapper任务内存大小。

第二步,分析提交过程 job.waitForCompletion(true);   追踪源码发现主要实现这个类

JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException
  1. Checking the input and output specifications of the job.//检查输入输出路径
  2. Computing the InputSplits for the job.//检查切片
  3. Setup the requisite accounting information for the DistributedCache of the job, if necessary.
  4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
  5. Submitting the job to the JobTracker and optionally monitoring it's status.

在此方法中,中重点看下此方法 int maps = writeSplits(job, submitJobDir);

追踪后具体实现可知

private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }


追踪job.getInputFormatClass()可以发现如下代码: 

public Class<? extends InputFormat<?,?>> getInputFormatClass() 
     throws ClassNotFoundException {
    return (Class<? extends InputFormat<?,?>>) 
      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
//根据用户配置文件首先取用,如果没有被取用则使用默认输入格式TextInputFormat
  }

所以可得知用户的默认输入类是TextInputformat类并且继承关系如下:

TextInputforMat-->FileinputFormat-->InputFormat

 追踪 List<InputSplit> splits = input.getSplits(job);可以得到如下源码:

最为重要的一个源码!!!!!!!!!!!

public List<InputSplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));如果用户设置则取用户,没有是1
    long maxSize = getMaxSplitSize(job);//如果用户设置则取用户,没有取最大值

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    for (FileStatus file: files) {
      Path path = file.getPath();//取输入文件的大小和路径
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);//获得所有块的位置。
        }
        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);//获得切片大小

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//这一块传参传的是切块的偏移量,返回这个块的索引
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),//根据当前块的索引号取出来块的位置包括副本的位置 然后传递给切片,然后切片知道往哪运算。即往块的位置信息计算
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.elapsedMillis());
    }
    return splits;
  }
 1.long splitSize = computeSplitSize(blockSize, minSize, maxSize);追踪源码发现
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

 切片大小默认是块的大小!!!!

假如让切片大小 < 块的大小则更改配置的最大值MaxSize,让其小于blocksize

假如让切片大小 > 块的大小则更改配置的最小值MinSize,让其大于blocksize

通过FileInputFormat.setMinInputSplitSize即可。

 2. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining) 追踪源码发现

 protected int getBlockIndex(BlockLocation[] blkLocations, 
                              long offset) {
    for (int i = 0 ; i < blkLocations.length; i++) {
      // is the offset inside this block?
      if ((blkLocations[i].getOffset() <= offset) &&
          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){//切片要大于>=块的起始量,小于一个块的末尾量。
        return i;//返回这个块
      }
    }
    BlockLocation last = blkLocations[blkLocations.length -1];
    long fileLength = last.getOffset() + last.getLength() -1;
    throw new IllegalArgumentException("Offset " + offset + 
                                       " is outside of file (0.." +
                                       fileLength + ")");
  }

 3. splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()

创建切片的时候,一个切片对应一个mapperr任务,所以创建切片的四个位置(path,0,10,host)

根据host可知mapper任务的计算位置,则对应计算向数据移动!!!!块是逻辑的,并没有真正切割数据。!!

4.上述getSplits方法最终得到一个切片的清单,清单的数目就是mapper的数量!!即开始方法的入口 int maps = writeSplits(job, submitJobDir);返回值。

5.计算向数据移动时会拉取只属于自己的文件。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java Web

JavaFX-TableView详解

前言 最近在着手一个学生管理系统的编写,涉及到TableView的使用,这前前后后的也有了些经验和想法想要记录和分享一下(事实上我正在想要用html网页代替界面...

3936
来自专栏编码小白

tomcat请求处理分析(六)servlet的处理过程

1.1.1.1  servlet的解析过程 servlet的解析分为两步实现,第一个是匹配到对应的Wrapper,第二个是加载对应的servlet并进行数据,这...

7107
来自专栏Android 开发学习

JsBridge 源码分析

1733
来自专栏Flutter入门

Weex是如何在Android客户端上跑起来的

Weex可以通过自己设计的DSL,书写.we文件或者.vue文件来开发界面,整个页面书写分成了3段,template、style、script,借鉴了成熟的MV...

4275
来自专栏大内老A

为ASP.NET MVC创建一个基于Unity的ControllerFactory

谈到IoC和ASP.NET的集成,很多人会先后想到Ninject,不过我们个人还是倾向于Unity。这篇文章简单地介绍如果创建基于Unity的Controlle...

1937
来自专栏恰童鞋骚年

Hadoop学习笔记—4.初识MapReduce

  MapReduce是Google的一项重要技术,它首先是一个编程模型,用以进行大数据量的计算。对于大数据量的计算,通常采用的处理手法就是并行计算。但对许多开...

912
来自专栏何俊林

LeakCanary的原理,你知道么?

2812
来自专栏JackieZheng

Spring读书笔记——bean加载

我们的日常开发几乎离不开Spring,他为我们的开发带来了很大的便捷,那么Spring框架是如何做到方便他人的呢。今天就来说说bean如何被加载加载。 我们在x...

2199
来自专栏分布式系统进阶

Librdkafka的操作处理队列

2732
来自专栏潇涧技术专栏

Lint Tool Analysis (2)

本系列的几篇源码分析文档意义不大,如果你正好也在研究lint源码,或者你想知道前面自定义lint规则中提出的那几个问题,抑或你只是想大致了解下lint的源码都有...

1631

扫码关注云+社区

领取腾讯云代金券