前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MapReduce:N keys,N files(二)

MapReduce:N keys,N files(二)

作者头像
YG
发布2018-12-14 16:37:28
7510
发布2018-12-14 16:37:28
举报
文章被收录于专栏:YG小书屋YG小书屋

如果你看了MapReduce:N keys,N files(一)这篇文章,并按其介绍的方法尝试去将N个key映射到N的文件中,你会发现分割后数据量比分割前的要多,并且有些文件不能正常读取。

用presto读取的话,可能会报这种错:

Query 20181122_073113_31966_aeiaw failed: Error opening Hive split hdfs://xxx3/dt=20181122/uiappid=300046/20181122150918_100.110.30.239_SkgSHZT8 (offset=62576808, length=62576807): Protocol message tag had invalid wire type.

【问题现象】

问题的直观现象是MR输出的orc文件,presto认为是无效的,无法读取。但并不是每一次MR的输出都会产生这种无效文件,有时有,有时没有。

【尝试一】

初步怀疑是reduce保存当前所有reduce文件RecordWriter的map被回收了(Map<String,OrcMapreduceRecordWriter> map = new HashMap<String, OrcMapreduceRecordWriter>()),但又觉得不应该,因为OrcReNameMapreduceRecordWriter一直保持着引用关系。但还是将其调到了父类OrcReNameFileOutputFormat中。如下:

代码语言:javascript
复制
package is.split;

import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.orc.OrcFile;
import org.apache.orc.Writer;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
import org.apache.orc.mapreduce.OrcOutputFormat;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class OrcReNameFileOutputFormat extends OrcOutputFormat {
    //保存各个key对应文件的writer
    static Map<String, OrcMapreduceRecordWriter> map = new HashMap<String, OrcMapreduceRecordWriter>(); //移到了父类中
        //private OrcMapreduceRecordWriter realWrite ;
    @Override
    public RecordWriter<Text, OrcStruct> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
        return new OrcReNameMapreduceRecordWriter(taskAttemptContext);
    }


    private class OrcReNameMapreduceRecordWriter extends RecordWriter<Text, OrcStruct>{

        private TaskAttemptContext taskAttemptContext;

        public OrcReNameMapreduceRecordWriter(TaskAttemptContext taskAttemptContext){
            this.taskAttemptContext = taskAttemptContext;
        }

        //该函数接收的key是map阶段输出的key
        public void write(Text key, OrcStruct value) throws IOException, InterruptedException {
          //真正向文件中写数据的Writer,还是OrcMapreduceRecordWriter
            OrcMapreduceRecordWriter realWrite = map.get(key.toString());
            if (realWrite == null){
                //String outputFileName = taskAttemptContext.getConfiguration().get("ouputfile.prefix.col.name", "uiappid");
            
                String split_key =  key.toString();
                //输出路径文件夹,以该key为文件夹
                String outputDirPath = taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR ) + "/" + split_key ;
                //输出路径文件名,文件名是根据当前时间戳和六位随机数生成的
                Path filename = new Path(new Path(outputDirPath), ISTool.getCurTime() + "_" + ISTool.getLocalIp() + "_" + RandomStringUtils.randomAlphanumeric(6) );
                Writer writer = OrcFile.createWriter(filename, org.apache.orc.mapred.OrcOutputFormat.buildOptions(taskAttemptContext.getConfiguration()));
                realWrite = new OrcMapreduceRecordWriter<OrcStruct>(writer);
                map.put(key.toString(), realWrite);
            }
            realWrite.write(NullWritable.get(), value);
        }

        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            for (Map.Entry<String, OrcMapreduceRecordWriter> entry : map.entrySet()) {
                if (entry.getValue() != null){
                    entry.getValue().close(context);
                }

            }
        }
    }
}

尝试修改后没有效果。。。依旧是有时候生成不完整的orc文件,有时没有。

【尝试二】

网上没有找到答案。文件名中加上机器IP后发现单个key的文件会由两台机器分别生成。一个key不应该对应一个reduce吗?为啥一个key会生成两个文件。。难道reduce跑偏了??

在浏览 https://www.cnblogs.com/YDDMAX/p/6828363.html 这篇文章的时候,看到OutputCommitter 有个abortJob方法,突然灵光一闪,无效的orc文件是不是备用的reduce任务生成的?后面查看OutputCommitter的方法验证了猜想。

MR框架会对跑的慢的reduce任务起一个备份任务,两个同时跑。如果一个reduce一个输出的话不会出现这种问题,因为reduce的输出会写到一个临时文件,只有整个reduce跑成功之后,才会将该临时文件移动到指定的输出目录。任务跑成功后掉的是commitTask方法。可以看到,任务跑成功后,OutputCommitter会调用rename方法移动文件,如果algorithmVersion大于1,还会对生成的文件做合并。

代码语言:javascript
复制
  @Private
  public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
      throws IOException {

    TaskAttemptID attemptId = context.getTaskAttemptID();
    if (hasOutputPath()) {
      context.progress();
      if(taskAttemptPath == null) {
        taskAttemptPath = getTaskAttemptPath(context);
      }
      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
      FileStatus taskAttemptDirStatus;
      try {
        taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
      } catch (FileNotFoundException e) {
        taskAttemptDirStatus = null;
      }

      if (taskAttemptDirStatus != null) {
        if (algorithmVersion == 1) {
          Path committedTaskPath = getCommittedTaskPath(context);
          if (fs.exists(committedTaskPath)) {
             if (!fs.delete(committedTaskPath, true)) {
               throw new IOException("Could not delete " + committedTaskPath);
             }
          }
          if (!fs.rename(taskAttemptPath, committedTaskPath)) { //移动文件
            throw new IOException("Could not rename " + taskAttemptPath + " to "
                + committedTaskPath);
          }
          LOG.info("Saved output of task '" + attemptId + "' to " +
              committedTaskPath);
        } else {
          // directly merge everything from taskAttemptPath to output directory
          mergePaths(fs, taskAttemptDirStatus, outputPath);
          LOG.info("Saved output of task '" + attemptId + "' to " +
              outputPath);
        }
      } else {
        LOG.warn("No Output found for " + attemptId);
      }
    } else {
      LOG.warn("Output Path is null in commitTask()");
    }
  }

而如果kill掉任务,OutputCommitter会删除该任务产生的临时目录:

代码语言:javascript
复制
 public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
    if (hasOutputPath()) { 
      context.progress();
      if(taskAttemptPath == null) {
        taskAttemptPath = getTaskAttemptPath(context);
      }
      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
      if(!fs.delete(taskAttemptPath, true)) {//删除临时文件
        LOG.warn("Could not delete "+taskAttemptPath);
      }
    } else {
      LOG.warn("Output Path is null in abortTask()");
    }
  }

回到我们这个程序,我是在reduce的过程中直接向指定的输出目录中写数据,如果reduce任务被kill了,其实数据已经写进去了,被kill的reduce产生的orc文件不是一个完整的orc文件,所以存在读数据格式问题。

【解决方案】:

将reduce任务备份执行的功能取消即可。

代码语言:javascript
复制
conf.set("mapreduce.reduce.speculative","false");
conf.set("mapreduce.map.speculative","false");
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.11.22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 【问题现象】
  • 【尝试一】
  • 【尝试二】
  • 【解决方案】:
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档