前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hadoop使用学习笔记(2)

Hadoop使用学习笔记(2)

作者头像
干货满满张哈希
发布2021-04-12 16:21:45
3760
发布2021-04-12 16:21:45
举报

Hadoop使用学习笔记

2. 基本Map-Reduce工作配置与原理(上)

我们假设MapReduce任务为统计所有文件中每个词语出现次数

整个MapReduce流程主要如下所示,可以分为四步:

这里写图片描述
这里写图片描述

我们将统计所有文件中每个词语出现次数拆分成为:

  1. 文件输入转换成Map工作可处理的键值对(后面我们会知道是以文件位置为key,文件内容为value)
  2. Map:提取上一步value中的所有词语,生成以词语为key,value为1的键值对
  3. Reduce:统计每个词语出现的个数,转换成以词语为key,value为出现次数的键值对
  4. 输出上一步的输出到文件

Input是将输入(比如数据库,网络,文件等)转化为Hadoop可以处理的标准输入。这里我们拿文件输入举例,假设我们有如下两个文件作为输入流:

这里写图片描述
这里写图片描述

Hadoop会将它们转化成什么呢?我们看下Hadoop的源码,针对文件输入,Hadoop中有如下类:

这里写图片描述
这里写图片描述

Hadoop会将过大的文件拆分。在HDFS中,文件被分成多个Block,并且每个Block会被保存多份。在用HDFS的文件作为输入时,我们需要获取文件有多少个Block以及每个Block位于哪个datanode上。获取到文件后,按照合适的规则以及map任务数量,分割成多个输入文件。有个makeSplit方法就是将文件输入转成一个一个块:

代码语言:javascript
复制
protected FileSplit makeSplit(Path file, long start, long length,
                                  String[] hosts, String[] inMemoryHosts) {
        return new FileSplit(file, start, length, hosts, inMemoryHosts);
    }

我们可以看出,每个FileSplit包括:

  • file:文件
  • start:该FileSplit在file中的起始字节位置
  • length:该FileSplit的字节长度
  • hosts和inMemoryHosts:这个我们之后在HDFS部分会详细描述,这里我们就理解成file所处的datanode和缓存node就可以

下面代码展示究竟是如何拆分的。

代码语言:javascript
复制
//job: MapReduce的配置, numSplits一般等于或者大于Map任务的个数,因为每个Map任务至少要处理一个split
public InputSplit[] getSplits(JobConf job, int numSplits)
        throws IOException {
    //计时用
    StopWatch sw = new StopWatch().start();
    FileStatus[] files = listStatus(job);

    //保存文件个数
    job.setLong(NUM_INPUT_FILES, files.length);
    //计算所有文件大小,遇到非文件(文件夹)抛异常
    long totalSize = 0;
    for (FileStatus file : files) {
        if (file.isDirectory()) {
            throw new IOException("Not a file: " + file.getPath());
        }
        totalSize += file.getLen();
    }
    //用总大小除以numSplits获取每个Map任务处理文件理想大小
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    //最小大小为1(minSplitSize)与配置中mapreduce.input.fileinputformat.split.minsize的最大值
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
            FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    //生成splits
    ArrayList splits = new ArrayList(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file : files) {
        Path path = file.getPath();
        long length = file.getLen();
        //长度不为0,获取File的HDFS信息
        if (length != 0) {
            //获取文件的HDFS信息,文件Block分布的Hosts和缓存Hosts
            FileSystem fs = path.getFileSystem(job);
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
                blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
                blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            //如果可以拆分
            if (isSplitable(fs, path)) {
                long blockSize = file.getBlockSize();
                //取goalSize与 minSize和blockSize的最小值 中的大的那个
                long splitSize = computeSplitSize(goalSize, minSize, blockSize);

                long bytesRemaining = length;
                //如果剩余大于splitSize的1.1倍则继续拆分
                while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                    String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                            length - bytesRemaining, splitSize, clusterMap);
                    splits.add(makeSplit(path, length - bytesRemaining, splitSize,
                            splitHosts[0], splitHosts[1]));
                    bytesRemaining -= splitSize;
                }
                //剩余不为0,则将剩下的组成一个Split
                if (bytesRemaining != 0) {
                    String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                            - bytesRemaining, bytesRemaining, clusterMap);
                    splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                            splitHosts[0], splitHosts[1]));
                }
            } else {
                String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, 0, length, clusterMap);
                splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
            }
        }
        //长度为0,直接用空host生成FileSplits
        else {
            splits.add(makeSplit(path, 0, length, new String[0]));
        }
    }
    //计时结束
    sw.stop();
    if (LOG.isDebugEnabled()) {
        LOG.debug("Total # of splits generated by getSplits: " + splits.size()
                + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits.toArray(new FileSplit[splits.size()]);
}

所以,经过Input这一步,我们得到了:

这里写图片描述
这里写图片描述

除了文件输入,Hadoop中还有其他输入:

这里写图片描述
这里写图片描述

比如DB输入DBInputFormat,常用的还是FileInputFormat,因为大部分MapReduce job都基于HDFS。FileInputFormat默认的实现类是TextInputFormat,就是纯文本输入,也是我们这里的例子使用的。 Map阶段的输入是TextInputFormat,之前的FileSplit会经过如下方法的处理:

代码语言:javascript
复制
public RecordReader getRecordReader(
                                        InputSplit genericSplit, JobConf job,
                                        Reporter reporter)
  throws IOException {

  reporter.setStatus(genericSplit.toString());
  String delimiter = job.get("textinputformat.record.delimiter");
  byte[] recordDelimiterBytes = null;
  if (null != delimiter) {
    recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
  }
  return new LineRecordReader(job, (FileSplit) genericSplit,
      recordDelimiterBytes);
}

LineRecordReader的next方法会在各个工作节点被调用,生成LongWritable类型的key和Text类型的value的键值对输入:

代码语言:javascript
复制
public synchronized boolean next(LongWritable key, Text value)
    throws IOException {

    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      key.set(pos);
      //
      int newSize = 0;
      if (pos == 0) {
        newSize = skipUtfByteOrderMark(value);
      } else {
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        pos += newSize;
      }

      if (newSize == 0) {
        return false;
      }
      if (newSize < maxLineLength) {
        return true;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
    }

    return false;
  }

通过上面源码可以看出:key为文件起始位置,value为文件内容。

这里写图片描述
这里写图片描述

每个Map任务接受(LongWritable->Text)为输入,输出为(Text->IntWritable)即(词语->1)。 之后进入Reduce,hadoop框架中会将Map的输出在Reduce步骤进行第一步的聚合,我们从ReduceTask类的runOldReducer方法中可以知道:

代码语言:javascript
复制
private 
  void runOldReducer(JobConf job,
                     TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator comparator,
                     Class keyClass,
                     Class valueClass) throws IOException {
    Reducer reducer = 
      ReflectionUtils.newInstance(job.getReducerClass(), job);
    // make output collector
    String finalName = getOutputName(getPartition());

    RecordWriter out = new OldTrackingRecordWriter(
        this, job, reporter, finalName);
    final RecordWriter finalOut = out;

    OutputCollector collector = 
      new OutputCollector() {
        public void collect(OUTKEY key, OUTVALUE value)
          throws IOException {
          finalOut.write(key, value);
          // indicate that progress update needs to be sent
          reporter.progress();
        }
      };

    // apply reduce function
    try {
      //increment processed counter only if skipping feature is enabled
      boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
        SkipBadRecords.getAutoIncrReducerProcCount(job);
      //生成同一个keys的所有values的Iterator数据结构
      ReduceValuesIterator values = isSkipping() ? 
          new SkippingReduceValuesIterator(rIter, 
              comparator, keyClass, valueClass, 
              job, reporter, umbilical) :
          new ReduceValuesIterator(rIter, 
          job.getOutputValueGroupingComparator(), keyClass, valueClass, 
          job, reporter);
      values.informReduceProgress();
      while (values.more()) {
        reduceInputKeyCounter.increment(1);
        //调用用户实现的reducer的reduce方法进行实际的reduce
        reducer.reduce(values.getKey(), values, collector, reporter);
        if(incrProcCount) {
          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
              SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
        }
        values.nextKey();
        values.informReduceProgress();
      }

      reducer.close();
      reducer = null;

      out.close(reporter);
      out = null;
    } finally {
      IOUtils.cleanup(LOG, reducer);
      closeQuietly(out, reporter);
    }
  }

可以总结,hadoop在到了reduce阶段,首先隐式的将Map输出进行归类。

这里写图片描述
这里写图片描述

注意,这里apple对应的Iterator有三个1,包含所有的Map的输出,所以,我们知道,只有所有的Map执行完后,reduce才会开始。 之后,reduce统计每个词出现次数,输出以词语为key,value为出现次数的键值对。

这里写图片描述
这里写图片描述

最后,将结果写入文件:

这里写图片描述
这里写图片描述

这样,一个完整的流程就展示完了。下一篇我们将写这个任务的源代码,配置本地提交任务至远程Hadoop集群。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-08-03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Hadoop使用学习笔记
    • 2. 基本Map-Reduce工作配置与原理(上)
    相关产品与服务
    大数据
    全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档