前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MapReduce快速入门系列(9) | Shuffle之Combiner合并

MapReduce快速入门系列(9) | Shuffle之Combiner合并

作者头像
不温卜火
发布2020-10-28 15:28:24
6090
发布2020-10-28 15:28:24
举报
文章被收录于专栏:不温卜火

Hello,大家好!博主上篇讲解了分区,这篇要讲的是合并操作。如何讲解这个章节呢?首先先对什么是合并进行解释,然后通过案例进行证明。

一. Combiner合并的简单介绍

1
1

今天我们讲的是Shuffle中的第七步

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一。

  • 1. Combiner是MR程序中Mapper和Reducer之外的一种组件。
  • 2. Combiner组件的父类就是Reducer。
  • 3. Combiner和Reducer的区别在于运行的位置 Combiner是在每一个MapTask所在的节点运行; Reducer是接收全局所有Mapper的输出结果;
  • 4. Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量。

二. 通过图片了解使用Combiner和不使用的区别

  • 1. 未使用combiner的网络开销
2
2
  • 2. 使用combiner的网络开销
3
3

  可以很明显的看出在combiner阶段,通过合并同一个区中相同key的value值,减小了后续的数据传输,从而提高了网络的io!

  但在MapReduce中,combiner是默认不开启的。为什么呢?是因为数据合并并不适用所有的业务需求,如果是计算个数,求和combiner还能发挥它的优势!但如果是求平均数,combiner必不可免的会影响到最终的结果,使结果变得不可靠!所以当我们需要到combiner时,需要手动开启。

  • 3. 自定义Combiner实现步骤 ①自定义一个Combiner继承Reducer,重写Reduce方法
代码语言:javascript
复制
public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        // 1 汇总操作
		int count = 0;
		for(IntWritable v :values){
			count += v.get();
		}

        // 2 写出
		context.write(key, new IntWritable(count));
	}
}

②在Job驱动类中设置: job.setCombinerClass(WordcountCombiner.class);

三. 代码实现

注:用于对比的程序源代码为《MapReduce系列(2) | 统计输出给定的文本文档每一个单词出现的总次数》中的源代码,有想进行对比的同学,可以自行复制创建对比(其实本源码就比源代码多一行)。

3.1 编写Mapper类

代码语言:javascript
复制
package com.buwenbuhuo.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author 卜温不火
 * @create 2020-04-22 21:24
 * com.buwenbuhuo.wordcount - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1 获取一行
        String line = value.toString();

        // 2 切割
        String[] words = line.split(" ");

        // 3 输出
        for (String word : words) {

            k.set(word);
            context.write(k, v);
        }
    }
}

3.2 编写Reducer类

代码语言:javascript
复制
package com.buwenbuhuo.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


/**
 * @author 卜温不火
 * @create 2020-04-22 21:24
 * com.buwenbuhuo.wordcount - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */
public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    int sum;
    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        // 1 累加求和
        sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }

        // 2 输出
        v.set(sum);
        context.write(key,v);
    }
}

3.3 编写Driver驱动类

代码语言:javascript
复制
package com.buwenbuhuo.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author 卜温不火
 * @create 2020-04-22 21:24
 * com.buwenbuhuo.wordcount - the name of the target package where the new class or interface will be created.
 * mapreduce0422 - the name of the current project.
 */
public class WcDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 1 获取配置信息以及封装任务
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 设置jar加载路径
        job.setJarByClass(WcDriver.class);

        // 3 设置map和reduce类
        job.setMapperClass(WcMapper.class);
        job.setReducerClass(WcReducer.class);

        // 4 设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
		
		//
		//  仅此一行添加
		job.setCombinerClass(WcReducer.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}

四. 对比及结论

  • 1. 对比
5
5
  • 2. Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。

本次的分享就到这里了,大家有什么疑惑或者好的建议可以在评论区积极留言。受益的小伙伴们不要忘了点赞关注我呀!!!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/04/28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. Combiner合并的简单介绍
  • 二. 通过图片了解使用Combiner和不使用的区别
  • 三. 代码实现
    • 3.1 编写Mapper类
      • 3.2 编写Reducer类
        • 3.3 编写Driver驱动类
        • 四. 对比及结论
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档