前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MapReduce快速入门系列(12) | MapReduce之OutputFormat

MapReduce快速入门系列(12) | MapReduce之OutputFormat

作者头像
不温卜火
发布2020-10-28 15:26:03
6540
发布2020-10-28 15:26:03
举报
文章被收录于专栏:不温卜火不温卜火

前面我们讲解了MapTask,ReduceTask和MapReduce运行机制。,那么这篇文章博主继续为大家讲解OutputFormat数据输出。

一. OutputFormat接口实现类

  OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面介绍几种常见的OutputFormat实现类。

1.1 文本输出TextOutputFormat

  默认的输出格式是TextOutFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toSTRING()方法把它们转换为字符串。

1.2 SequenceFileOutputFormat

  将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

1.3 自定义OutputFormat

  根据用户需求,自定义实现输出。

二. 自定义OutputFormat的使用场景和步骤

2.1 使用场景

  为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。   eg:要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

2.2 自定义OutputFormat步骤

1、自定义一个类继承FileOutputFormat。 2、改写RecordWriter,具体改写输出数据的方法write()。

三. 举例操作

3.1 需求

  过滤输入的log日志,包含buwenbuhuo的网站输出到d:/buwenbuhuo.log,不包含buwenbuhuo的网站输出到d:/other.log

1
1

3.2 需求分析

  1. 需求: 过滤输入的log日志,包含buwenbuhuo的网站输出到d:/buwenbuhuo.log,不包含buwenbuhuo的网站输出到d:/other.log
2
2

3.3 编写代码

  • 1. 自定义一个MyOutputFormat类
代码语言:javascript
复制
package com.buwenbuhuo.outputformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/**
 * @author 卜温不火
 * @create 2020-04-25 16:37
 * com.buwenbuhuo.outputformat - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */
public class MyOutputFormat extends FileOutputFormat<LongWritable, Text> {
    @Override
    public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        MyRecordWriter myRecordWriter = new MyRecordWriter();
        myRecordWriter.initialize(job);
        return myRecordWriter;
    }
}
  • 2. 编写MyRecordWriter类
代码语言:javascript
复制
package com.buwenbuhuo.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/**
 * @author 卜温不火
 * @create 2020-04-25 16:37
 * com.buwenbuhuo.outputformat - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */
public class MyRecordWriter extends RecordWriter<LongWritable, Text> {


    private FSDataOutputStream buwenbuhuo;
    private FSDataOutputStream other;

    /**
     * 初始化方法
     * @param job
     */
    public void initialize(TaskAttemptContext job) throws IOException {
        String outdir = job.getConfiguration().get(FileOutputFormat.OUTDIR);
        FileSystem fileSystem = FileSystem.get(job.getConfiguration());
        buwenbuhuo = fileSystem.create(new Path(outdir + "/buwenbuhuo.log"));
        other = fileSystem.create(new Path(outdir + "/other.log"));
    }


    /**
     * 将KV写出,每对KV调用一次
     * @param key
     * @param value
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void write(LongWritable key, Text value) throws IOException, InterruptedException {
        String out = value.toString() + "\n";
        if (out.contains("buwenbuhuo")) {
            buwenbuhuo.write(out.getBytes());
        } else {
            other.write(out.getBytes());
        }
    }

    /**
     * 关闭资源
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(buwenbuhuo);
        IOUtils.closeStream(other);
    }
}
  • 3. 编写OutputDriver类
代码语言:javascript
复制
package com.buwenbuhuo.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/**
 * @author 卜温不火
 * @create 2020-04-25 16:37
 * com.buwenbuhuo.outputformat - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */
public class OutputDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(OutputDriver.class);

        job.setOutputFormatClass(MyOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path("d:\\input"));
        FileOutputFormat.setOutputPath(job, new Path("d:\\output"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

3.4 运行及结果

  • 1. 运行
3
3
  • 2.结果
4
4
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-04-29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. OutputFormat接口实现类
    • 1.1 文本输出TextOutputFormat
      • 1.2 SequenceFileOutputFormat
        • 1.3 自定义OutputFormat
        • 二. 自定义OutputFormat的使用场景和步骤
          • 2.1 使用场景
            • 2.2 自定义OutputFormat步骤
            • 三. 举例操作
              • 3.1 需求
                • 3.2 需求分析
                  • 3.3 编写代码
                    • 3.4 运行及结果
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档