MapReduce:N keys,N files(二)

如果你看了MapReduce:N keys,N files(一)这篇文章,并按其介绍的方法尝试去将N个key映射到N的文件中,你会发现分割后数据量比分割前的要多,并且有些文件不能正常读取。 用presto读取的话,可能会报这种错:

Query 20181122_073113_31966_aeiaw failed: Error opening Hive split hdfs://xxx3/dt=20181122/uiappid=300046/20181122150918_100.110.30.239_SkgSHZT8 (offset=62576808, length=62576807): Protocol message tag had invalid wire type.

【问题现象】

问题的直观现象是MR输出的orc文件,presto认为是无效的,无法读取。但并不是每一次MR的输出都会产生这种无效文件,有时有,有时没有。

【尝试一】

初步怀疑是reduce保存当前所有reduce文件RecordWriter的map被回收了(Map<String,OrcMapreduceRecordWriter> map = new HashMap<String, OrcMapreduceRecordWriter>()),但又觉得不应该,因为OrcReNameMapreduceRecordWriter一直保持着引用关系。但还是将其调到了父类OrcReNameFileOutputFormat中。如下:

package is.split;

import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.orc.OrcFile;
import org.apache.orc.Writer;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
import org.apache.orc.mapreduce.OrcOutputFormat;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class OrcReNameFileOutputFormat extends OrcOutputFormat {
    //保存各个key对应文件的writer
    static Map<String, OrcMapreduceRecordWriter> map = new HashMap<String, OrcMapreduceRecordWriter>(); //移到了父类中
        //private OrcMapreduceRecordWriter realWrite ;
    @Override
    public RecordWriter<Text, OrcStruct> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
        return new OrcReNameMapreduceRecordWriter(taskAttemptContext);
    }


    private class OrcReNameMapreduceRecordWriter extends RecordWriter<Text, OrcStruct>{

        private TaskAttemptContext taskAttemptContext;

        public OrcReNameMapreduceRecordWriter(TaskAttemptContext taskAttemptContext){
            this.taskAttemptContext = taskAttemptContext;
        }

        //该函数接收的key是map阶段输出的key
        public void write(Text key, OrcStruct value) throws IOException, InterruptedException {
          //真正向文件中写数据的Writer,还是OrcMapreduceRecordWriter
            OrcMapreduceRecordWriter realWrite = map.get(key.toString());
            if (realWrite == null){
                //String outputFileName = taskAttemptContext.getConfiguration().get("ouputfile.prefix.col.name", "uiappid");
            
                String split_key =  key.toString();
                //输出路径文件夹,以该key为文件夹
                String outputDirPath = taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR ) + "/" + split_key ;
                //输出路径文件名,文件名是根据当前时间戳和六位随机数生成的
                Path filename = new Path(new Path(outputDirPath), ISTool.getCurTime() + "_" + ISTool.getLocalIp() + "_" + RandomStringUtils.randomAlphanumeric(6) );
                Writer writer = OrcFile.createWriter(filename, org.apache.orc.mapred.OrcOutputFormat.buildOptions(taskAttemptContext.getConfiguration()));
                realWrite = new OrcMapreduceRecordWriter<OrcStruct>(writer);
                map.put(key.toString(), realWrite);
            }
            realWrite.write(NullWritable.get(), value);
        }

        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            for (Map.Entry<String, OrcMapreduceRecordWriter> entry : map.entrySet()) {
                if (entry.getValue() != null){
                    entry.getValue().close(context);
                }

            }
        }
    }
}

尝试修改后没有效果。。。依旧是有时候生成不完整的orc文件,有时没有。

【尝试二】

网上没有找到答案。文件名中加上机器IP后发现单个key的文件会由两台机器分别生成。一个key不应该对应一个reduce吗?为啥一个key会生成两个文件。。难道reduce跑偏了?? 在浏览 https://www.cnblogs.com/YDDMAX/p/6828363.html 这篇文章的时候,看到OutputCommitter 有个abortJob方法,突然灵光一闪,无效的orc文件是不是备用的reduce任务生成的?后面查看OutputCommitter的方法验证了猜想。 MR框架会对跑的慢的reduce任务起一个备份任务,两个同时跑。如果一个reduce一个输出的话不会出现这种问题,因为reduce的输出会写到一个临时文件,只有整个reduce跑成功之后,才会将该临时文件移动到指定的输出目录。任务跑成功后掉的是commitTask方法。可以看到,任务跑成功后,OutputCommitter会调用rename方法移动文件,如果algorithmVersion大于1,还会对生成的文件做合并。

  @Private
  public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
      throws IOException {

    TaskAttemptID attemptId = context.getTaskAttemptID();
    if (hasOutputPath()) {
      context.progress();
      if(taskAttemptPath == null) {
        taskAttemptPath = getTaskAttemptPath(context);
      }
      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
      FileStatus taskAttemptDirStatus;
      try {
        taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
      } catch (FileNotFoundException e) {
        taskAttemptDirStatus = null;
      }

      if (taskAttemptDirStatus != null) {
        if (algorithmVersion == 1) {
          Path committedTaskPath = getCommittedTaskPath(context);
          if (fs.exists(committedTaskPath)) {
             if (!fs.delete(committedTaskPath, true)) {
               throw new IOException("Could not delete " + committedTaskPath);
             }
          }
          if (!fs.rename(taskAttemptPath, committedTaskPath)) { //移动文件
            throw new IOException("Could not rename " + taskAttemptPath + " to "
                + committedTaskPath);
          }
          LOG.info("Saved output of task '" + attemptId + "' to " +
              committedTaskPath);
        } else {
          // directly merge everything from taskAttemptPath to output directory
          mergePaths(fs, taskAttemptDirStatus, outputPath);
          LOG.info("Saved output of task '" + attemptId + "' to " +
              outputPath);
        }
      } else {
        LOG.warn("No Output found for " + attemptId);
      }
    } else {
      LOG.warn("Output Path is null in commitTask()");
    }
  }

而如果kill掉任务,OutputCommitter会删除该任务产生的临时目录:

 public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
    if (hasOutputPath()) { 
      context.progress();
      if(taskAttemptPath == null) {
        taskAttemptPath = getTaskAttemptPath(context);
      }
      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
      if(!fs.delete(taskAttemptPath, true)) {//删除临时文件
        LOG.warn("Could not delete "+taskAttemptPath);
      }
    } else {
      LOG.warn("Output Path is null in abortTask()");
    }
  }

回到我们这个程序,我是在reduce的过程中直接向指定的输出目录中写数据,如果reduce任务被kill了,其实数据已经写进去了,被kill的reduce产生的orc文件不是一个完整的orc文件,所以存在读数据格式问题。

【解决方案】:

将reduce任务备份执行的功能取消即可。

conf.set("mapreduce.reduce.speculative","false");
conf.set("mapreduce.map.speculative","false");

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏IT米粉

你必须了解的反射——反射来实现实体验证

日常开发,都是通过API进行前后端的系统对接,对API参数的验证是一个使用率非常高的功能,如果能非常简便的的进行参数验证,能降低代码量,提升工作效率。

4078
来自专栏菩提树下的杨过

利用sharding-jdbc分库分表

sharding-jdbc是当当开源的一款分库分表的数据访问层框架,能对mysql很方便的分库、分表,基本不用修改原有代码,只要配置一下即可,完整的配置参考以下...

5567
来自专栏Jackson0714

02.你真的知道线程安全的“单件模式”吗?

2867
来自专栏Java学习网

高性能Java解析器实现过程详解

高性能Java解析器实现过程详解 如果你没有指定数据或语言标准的或开源的Java解析器, 可能经常要用Java实现你自己的数据或语言解析器。或者,可能有很多解析...

4326
来自专栏腾讯Bugly的专栏

iOS: ARM64不定函数传参问题调试剖析

| 导语  ABI(Application Binary Interface)描述了应用程序和OS之间的底层接口。其中,通过查阅调用约定(Calling Co...

3552
来自专栏逆向技术

16位汇编第八讲指令第四讲

        16位汇编第八讲指令第四讲 一丶串操作类指令 1.什么是串操作?   1.串操作指令是8086指令系统中比较独特的一类指令,采用比较特殊的数据串...

2046
来自专栏Java Web

Java 面试知识点解析(四)——版本特性篇(1)

在遨游了一番 Java Web 的世界之后,发现了自己的一些缺失,所以就着一篇深度好文:知名互联网公司校招 Java 开发岗面试知识点解析 ,来好好的对 Jav...

4946
来自专栏移动端开发

iOS 多线程之线程锁Swift-Demo示例总结

线程锁是什么       在前面的文章中总结过多线程,总结了多线程之后,线程锁也是必须要好好总结的东西,这篇文章构思的时候可能写的东西得许多,只能挤时间一点点的...

6628
来自专栏小尘哥的专栏

【springboot+easypoi】一行代码搞定简单的word导出

2834
来自专栏nnngu

记录某公司(简称SMKJ) 的一次面试

昨天去了一家公司面试 Java 开发岗位,这篇文章主要是做一个面试的记录以及总结。 这家公司的规模大概100-200人,环境还可以,在一栋大厦租了两层办公室(3...

3276

扫码关注云+社区

领取腾讯云代金券