Hadoop旧mapreduce的map任务切分原理

前言

最近在工作过程中接触一些Hive数据仓库中的表,这些表实际是从关系型数据库通过Sqoop抽到Hive的。在开发过程中对map任务的划分进行性能调优,发现mapreduce中关于FileInputFormat的参数调整都不起作用,最后发现这些老任务都是用旧版的mapreduce开发的,于是顺便研究下旧版mapreduce的任务划分策略。有关新版mapreduce的任务划分策略,大家可以参考我之前的博文《Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)》。

源码分析

根据《Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)》一文的内容,我们知道map任务的划分关键在于FileInputFormat的getSplits方法的实现策略,现在我们来看看其源码:

    public InputSplit[] getSplits(JobConf job, int numSplits)  

      throws IOException {  
      Stopwatch sw = new Stopwatch().start();  
      FileStatus[] files = listStatus(job);  
 
      // Save the number of input files for metrics/loadgen  
      job.setLong(NUM_INPUT_FILES, files.length);  
      long totalSize = 0;                           // compute total size  
      for (FileStatus file: files) {                // check we have valid files  
        if (file.isDirectory()) {  
          throw new IOException("Not a file: "+ file.getPath());  
        }  
        totalSize += file.getLen();  
      }  
 
      long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);  
      long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.  
        FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);  
 
      // generate splits  
      ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);  
      NetworkTopology clusterMap = new NetworkTopology();  
      for (FileStatus file: files) {  
        Path path = file.getPath();  
        long length = file.getLen();  
        if (length != 0) {  
          FileSystem fs = path.getFileSystem(job);  
          BlockLocation[] blkLocations;  
          if (file instanceof LocatedFileStatus) {  
            blkLocations = ((LocatedFileStatus) file).getBlockLocations();  
          } else {  
            blkLocations = fs.getFileBlockLocations(file, 0, length);  
          }  
          if (isSplitable(fs, path)) {  
            long blockSize = file.getBlockSize();  
            long splitSize = computeSplitSize(goalSize, minSize, blockSize);  
 
            long bytesRemaining = length;  
            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
              String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,  
                  length-bytesRemaining, splitSize, clusterMap);  
              splits.add(makeSplit(path, length-bytesRemaining, splitSize,  
                  splitHosts[0], splitHosts[1]));  
              bytesRemaining -= splitSize;  
            }  
 
            if (bytesRemaining != 0) {  
              String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length  
                  - bytesRemaining, bytesRemaining, clusterMap);  
              splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,  
                  splitHosts[0], splitHosts[1]));  
            }  
          } else {  
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);  
            splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));  
          }  
        } else {   
          //Create empty hosts array for zero length files  
          splits.add(makeSplit(path, 0, length, new String[0]));  
        }  
      }  
      sw.stop();  
      if (LOG.isDebugEnabled()) {  
        LOG.debug("Total # of splits generated by getSplits: " + splits.size()  
            + ", TimeTaken: " + sw.elapsedMillis());  
      }  
      return splits.toArray(new FileSplit[splits.size()]);  
    }  
 
    protected long computeSplitSize(long goalSize, long minSize,  
                                         long blockSize) {  
      return Math.max(minSize, Math.min(goalSize, blockSize));  
    }  

这里对以上代码的划分策略进行整理:

  1. 遍历当前作业的所有输入文件,然后将累积这些文件的字节数并保存到变量totalSize中;
  2. 如果用户指定了mapreduce.job.maps参数,那么这个参数会被保存在入参numSplits中;
  3. 用户想要通过numSplits控制map任务的数量,那么需求对totalSize进行平分,以便确定每个map任务划分的输入大小。这个计算很简单,即使用totalSize除以numSplits,最后得到的目标划分大小存储在变量goalSize中;
  4. 常量SPLIT_MINSIZE实际是由参数mapreduce.input.fileinputformat.split.minsize来控制的,如果没有配置则默认是1。minSplitSize默认是1,切旧版FileIntputFormat没有设置此变量的地方。最后取SPLIT_MINSIZE和minSplitSize的最大值,并保存在变量minSize中;
  5. 遍历当前作业的每个输入文件,计算每个输入文件,将被划分的任务数量,最后将每个文件划分的任务数量合并起来就是整个作业划分的任务数量。

以上只是总体分析了作业的任务划分,有关每个输入文件的任务数量划分步骤如下:

  1. 判断文件的大小,只有文件字节数大于0才是有意义的;
  2. 判断文件是否是可以切分的,只有能够切分的文件才会继续进行任务数量划分;
  3. 调用文件的getBlockSize方法,获取文件的块大小并存储在变量blockSize中;
  4. 调用computeSplitSize方法计算最后划分给每个任务的输入大小,并保存在splitSize中。计算公式为:splitSize = max(minSize, min(goalSize, blockSize));
  5. 将文件按照splitSize的大小进行划分,不足splitSize大小的也算作一个任务划分数。

总结

根据以上分析发现旧版mapreduce和新版mapreduce的FileIntputFormat关于map任务数量划分的实现逻辑不同,在对它们进行开发和性能优化时要特别注意。

原文发布于微信公众号 - CSDN技术头条(CSDN_Tech)

原文发表时间:2016-06-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏ml

HDUOJ------Worm

Worm Time Limit: 1000/1000 MS (Java/Others)    Memory Limit: 32768/32768 K (Java...

34780
来自专栏诸葛青云的专栏

教你利用Python把图片转字符画!代码哆啦A梦你见过嘛?

图片转字符画的关键是把图片的灰度值与自定义的字符集之间建立映射关系,不同区间的灰度值对应不同的字符,之后将图片每一个像素对应的字符打印出来,就是我们要的字符画。...

42140
来自专栏听雨堂

Pandas对行情数据的预处理

库里是过去抓取的行情数据,间隔6秒,每分钟8-10个数据不等,还有开盘前后的一些数据,用Pandas可以更加优雅地进行处理。 ? 需要把当前时间设置为index...

230100
来自专栏人工智能头条

在Apache Spark上跑Logistic Regression算法

22330
来自专栏恰童鞋骚年

设计模式的征途—8.桥接(Bridge)模式

在现实生活中,我们常常会用到两种或多种类型的笔,比如毛笔和蜡笔。假设我们需要大、中、小三种类型的画笔来绘制12中不同的颜色,如果我们使用蜡笔,需要准备3*12=...

14830
来自专栏fangyangcoder

基于交通灯数据集的端到端分类

抓住11月的尾巴,这里写上昨天做的一个DL的作业吧,作业很简单,基于交通灯的图像分类,但这确是让你从0构建深度学习系统的好例子,很多已有的数据集都封装好了,直接...

37930
来自专栏XAI

Java分布式神经网络库Deeplearning4j之上手实践手写数字图像识别与模型训练

环境的搭建可以参考另一篇文章。 Java分布式神经网络库Deeplearning4j 环境搭建和运行一个例子 代码所在包截图示意 ? 第一步运行MnistIm...

1.1K100
来自专栏小樱的经验随笔

2017年中国大学生程序设计竞赛-中南地区赛暨第八届湘潭市大学生计算机程序设计大赛题解&源码(A.高斯消元,D,模拟,E,前缀和,F,LCS,H,Prim算法,I,胡搞,J,树状数组)

A-------------------------------------------------------------------------------...

39370
来自专栏生信宝典

Pandas,让Python像R一样处理数据,但快

What is pandas Pandas是python中用于处理矩阵样数据的功能强大的包,提供了R中的dataframe和vector的操作,使得我们在使用p...

27250
来自专栏我的小碗汤

19个很有用的 ElasticSearch 查询语句 篇二

另一个结构化查询的例子是 范围查询。在这个例子中,我们要查找 2015 年出版的书。

1.7K30

扫码关注云+社区

领取腾讯云代金券