MapReduce中map并行度优化及源码分析

mapTask并行度的决定机制

  一个job的map阶段并行度由客户端在提交job时决定,而客户端对map阶段并行度的规划的基本逻辑为:将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理。

FileInputFormat切片机制

原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6733968.html

1、默认切片定义在InputFormat类中的getSplit()方法

2、FileInputFormat中默认的切片机制:

a) 简单地按照文件的内容长度进行切片

b) 切片大小,默认等于hdfs的block大小

c) 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

比如待处理数据有两个文件:

file1.txt    260M
file2.txt    10M

经过FileInputFormat的切片机制运算后,形成的切片信息如下:  

file1.txt.split1--  0~128
file1.txt.split2--  128~260 //如果剩余的文件长度/切片长度<=1.1则会将剩余文件的长度并未一个切片
file2.txt.split1--  0~10M

3、FileInputFormat中切片的大小的参数配置

通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由这几个值来运算决定。

minsize:默认值:1  
   配置参数: mapreduce.input.fileinputformat.split.minsize    

maxsize:默认值:Long.MAXValue  
    配置参数:mapreduce.input.fileinputformat.split.maxsize

blocksize:值为hdfs的对应文件的blocksize

配置读取目录下文件数量的线程数:public static final String LIST_STATUS_NUM_THREADS =
      "mapreduce.input.fileinputformat.list-status.num-threads";

因此,默认情况下,Math.max(minSize, Math.min(maxSize, blockSize));切片大小=blocksize

maxsize(切片最大值):参数如果调得比blocksize小,则会让切片变小。

minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blocksize还大。

选择并发数的影响因素:

1、运算节点的硬件配置

2、运算任务的类型:CPU密集型还是IO密集型

3、运算任务的数据量

3、hadoop2.6.4源码解析

org.apache.hadoop.mapreduce.JobSubmitter类

   //得到job的map任务的并行数量
   private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }
  
  @SuppressWarnings("unchecked")
  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
     ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
   
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

切片计算逻辑,关注红色字体代码即可。

public List<InputSplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();

    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
   //遍历文件,对每一个文件进行如下处理:获得文件的blocksize,获取文件的长度,得到切片信息(spilt 文件路径,切片编号,偏移量范围)
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
         long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.elapsedMillis());
    }
    return splits;
  }
 public static final String SPLIT_MINSIZE = 
    "mapreduce.input.fileinputformat.split.minsize";
  
  public static final String SPLIT_MAXSIZE = 
    "mapreduce.input.fileinputformat.split.maxsize";
    
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    
  //保证切分的文件长度最小不得小于1字节
  protected long getFormatMinSplitSize() {
    return 1;
  }
  
  //如果没有在conf中设置SPLIT_MINSIZE参数,则取默认值1字节。
  public static long getMinSplitSize(JobContext job) {
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
  }
  
  //得到切片文件的最大长度
  long maxSize = getMaxSplitSize(job);
  
  //如果没有在conf中设置SPLIT_MAXSIZE参数,则去默认值Long.MAX_VALUE字节。
  public static long getMaxSplitSize(JobContext context) {
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }
  
   //读取指定目录下的所有文件的信息
   List<FileStatus> files = listStatus(job);
   //如果没有指定开启几个线程读取,则默认一个线程去读文件信息,因为存在目录下有上亿个文件的情况,所以有需要开启多个线程加快读取。
   int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
        DEFAULT_LIST_STATUS_NUM_THREADS);
   public static final String LIST_STATUS_NUM_THREADS =
      "mapreduce.input.fileinputformat.list-status.num-threads";
   public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
  
  //计算切片文件的逻辑大小
  long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }
  
  private static final double SPLIT_SLOP = 1.1;   // 10% slop
  //判断剩余文件与切片大小的比是否为1.1.
  while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                      blkLocations[blkIndex].getHosts(),
                      blkLocations[blkIndex].getCachedHosts()));
          bytesRemaining -= splitSize;
    }

map并行度

  如果job的每个map或者reduce的task的运行时间都只有30-40秒钟(最好每个map的执行时间最少不低于一分钟),那么就减少该job的map或者reduce数。每一个task的启动和加入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task都非常快就跑完了,就会在task的开始和结束的时候浪费太多的时间。

  配置task的JVM重用可以改善该问题:

  (mapred.job.reuse.jvm.num.tasks,默认是1,表示一个JVM上最多可以顺序执行的task数目(属于同一个Job)是1。也就是说一个task启一个JVM)。

小文件的场景下,默认的切片机制会造成大量的maptask处理很少量的数据,效率低下:

解决方案:

  推荐:把小文件存入hdfs之前进行预处理,先合并为大文件后再上传。

  折中:写程序对hdfs上小文件进行合并再跑job处理。

  补救措施:如果大量的小文件已经存在hdfs上了,使用combineInputFormate组件,它可以将众多的小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个maptask操作了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏腾讯数据库技术

时间精度引起MySQL主从不一致问题剖析

1. 主从数据不一致          近日接报某实例一个datetime字段主从数据不一致,其它数据暂未发现异常。第一反应可能是人为修改,如果用户有高权限帐...

2652
来自专栏岑玉海

Carbondata源码系列(一)文件生成过程

在滴滴的两年一直在加班,人也变懒了,就很少再写博客了,最近在进行Carbondata和hive集成方面的工作,于是乎需要对Carbondata进行深入的研究。 ...

6206
来自专栏Java Web

JavaFX-TableView详解

前言 最近在着手一个学生管理系统的编写,涉及到TableView的使用,这前前后后的也有了些经验和想法想要记录和分享一下(事实上我正在想要用html网页代替界面...

3936
来自专栏JackieZheng

Spring读书笔记——bean加载

我们的日常开发几乎离不开Spring,他为我们的开发带来了很大的便捷,那么Spring框架是如何做到方便他人的呢。今天就来说说bean如何被加载加载。 我们在x...

2199
来自专栏分布式系统进阶

Librdkafka的操作处理队列

2732
来自专栏琯琯博客

Yii2 学习笔记之助手类

2857
来自专栏xingoo, 一个梦想做发明家的程序员

【插件开发】—— 9 编辑器代码分块着色-高亮显示!

前文回顾: 1 插件学习篇 2 简单的建立插件工程以及模型文件分析 3 利用扩展点,开发透视图 4 SWT编程须知 5 SWT简单控件的使用与布局搭...

2836
来自专栏大前端_Web

easyUI组件datagrid的二次封装

版权声明:本文为吴孔云博客原创文章,转载请注明出处并带上链接,谢谢。 https://blog.csdn.net/wkyseo/articl...

2593
来自专栏君赏技术博客

HQ移动20170317期周报

删除缓存:rm ~/Library/Caches/CocoaPods/search_index.json

1103
来自专栏Java成神之路

Java企业微信开发_07_JSSDK多图上传

 所有的JS接口只能在企业微信应用的可信域名下调用(包括子域名),可在企业微信的管理后台“我的应用”里设置应用可信域名。这个域名必须要通过ICP备案,不然jss...

1882

扫码关注云+社区

领取腾讯云代金券