前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >几个关于MapReduce的小例子

几个关于MapReduce的小例子

作者头像
java技术爱好者
发布2022-03-15 14:02:42
3770
发布2022-03-15 14:02:42
举报
文章被收录于专栏:java技术爱好者java技术爱好者

文章已收录到我的Github精选,欢迎Star:https://github.com/yehongzhi/learningSummary

写在前面

上一篇文章通过写一个WordCount学习了MapReduce的入门操作,那么这篇文章继续通过多一些例子来学习MapReduce。下面介绍几种比较常见的操作:排序,去重,求和,求平均数,TopK查询(查询排名前K名的记录)

排序

其实MapReduce会默认对Key进行升序自然排序,这显然是远远不够用的,下面我举个例子,输入的file1内容如下:

代码语言:javascript
复制
1,256
1,12
3,283
4,478
2,1001
2,3600
1,4
5,78
2,33

file2内容如下:

代码语言:javascript
复制
5,10
3,598
4,654
1,741
2,123
3,850
2,11568
1,12574

我们要的结果是根据第一个数字进行排序,如果第一个数字相同,则根据第二个数字排序,怎么玩呢?

首先我们得创建一个自定义的类,里面包括两个字段表示一行里面的第一个值和第二个值,接着实现序列化,反序列化方法,最重要是比较方法。

代码语言:javascript
复制
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name ComparableKey
 * @date 2022-01-13 23:58
 **/
public class ComparableKey implements WritableComparable<ComparableKey> {
 //一行内容的第一个值
    private long firstNum;
 //第二个值
    private long secondNum;

    public ComparableKey() {
    }

    public ComparableKey(long firstNum, long secondNum) {
        this.firstNum = firstNum;
        this.secondNum = secondNum;
    }

    public long getFirstNum() {
        return firstNum;
    }

    public void setFirstNum(long firstNum) {
        this.firstNum = firstNum;
    }

    public long getSecondNum() {
        return secondNum;
    }

    public void setSecondNum(long secondNum) {
        this.secondNum = secondNum;
    }

    @Override
    public int compareTo(ComparableKey otherComparableKey) {
        //如果第一位数相等,则比较第二位数,从小到大排序
        if (firstNum == otherComparableKey.getFirstNum()) {
            //返回大于0的数表示前面的大于后面的,小于0则表示前面的数小于后面的数
            return (int) (secondNum - otherComparableKey.getSecondNum());
        } else {
            //返回大于0的数表示前面的大于后面的,小于0则表示前面的数小于后面的数
            return (int) (firstNum - otherComparableKey.getFirstNum());
        }
    }

    //序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(firstNum);
        dataOutput.writeLong(secondNum);
    }

    //反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        firstNum = dataInput.readLong();
        secondNum = dataInput.readLong();
    }
}

接着写Mapper,输入类型是Text,转换为自定义的ComparableKey类型,会自动调compareTo()方法进行比较排序。

代码语言:javascript
复制
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name NumberSortMapper
 * @date 2022-01-11 23:56
 **/

/**
 * Mapper有四个泛型参数需要填写
 * 第一个参数KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,类型为LongWritable
 * 第二个参数VALUEIN:默认情况下,是mr框架所读的一行文本的内容,类型为Text
 * 第三个参数KEYOUT:是逻辑处理完成之后输出数据的key,使用自定义的类型ComparableKey
 * 第四个参数VALUEOUT:是逻辑处理完成之后输出数据的value,在此处是次数,类型为NullWritable
 */
public class NumberSortMapper extends Mapper<LongWritable, Text, ComparableKey, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] strings = value.toString().split(",");
        long firstNum = Long.parseLong(strings[0]);
        long secondNum = Long.parseLong(strings[1]);
        ComparableKey comparableKey = new ComparableKey(firstNum, secondNum);
        context.write(comparableKey, NullWritable.get());
    }
}

Mapper已经做了排序,那么Reduce层就只需要取出来就行了。

代码语言:javascript
复制
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name NumberSortReduce
 * @date 2022-01-14 00:18
 **/
public class NumberSortReduce extends Reducer<ComparableKey, NullWritable, LongWritable, LongWritable> {

    @Override
    protected void reduce(ComparableKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(key.getFirstNum()), new LongWritable(key.getSecondNum()));
    }
}

最后再写个Main方法,作为入口:

代码语言:javascript
复制
public class NumberSort {

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(NumberSort.class);
        job.setMapperClass(NumberSortMapper.class);
        job.setReducerClass(NumberSortReduce.class);
        job.setMapOutputKeyClass(ComparableKey.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

接着把实验的文件上传上去hadoop的number_sort文件夹(自己创建的目录)。然后再执行任务,使用命令:

代码语言:javascript
复制
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar NumberSort number_sort number_sort_output

执行成功后,效果如下:

代码语言:javascript
复制
1 4
1 12
1 256
1 741
1 12574
2 33
2 123
2 1001
2 3600
2 11568
3 283
3 598
3 850
4 478
4 654
5 10
5 78

去重

比如以下的这个文本,单词去重,怎么做呢?

代码语言:javascript
复制
hadoop is good
hadoop is so good
java is great
java and hadoop is very good

其实很简单,因为MapReduce输出的类型就是Map,Map的特性就是Key不能重复,于是乎我们可以把值想要去重的值放入Key,Value设置为NULL就完事了。Mapper步骤如下:

代码语言:javascript
复制
public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text keyOut = new Text();
        String[] strings = value.toString().split(" ");
        for (String str : strings) {
            keyOut.set(str);
            context.write(keyOut,NullWritable.get());
        }
    }
}

Reduce步骤不需要做其他操作,直接取值即可。

代码语言:javascript
复制
public class DistinctReduce extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}

再加个入口Main方法。

代码语言:javascript
复制
public class DistinctMain {
    public static void main(String[] args) throws Exception{
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(DistinctMain.class);
        job.setMapperClass(DistinctMapper.class);
        job.setReducerClass(DistinctReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

把测试数据上传到hadoop上面。

然后执行命令如下:

代码语言:javascript
复制
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar DistinctMain distinct distinct_output

输出结果如下:

代码语言:javascript
复制
and
good
great
hadoop
is
java
so
very

去重完成。

求和

比如有一道很经典的数学题,对1到100进行求和,如果用笔算很简单,可以用首尾相加法,1加99,2加98...以此类推。但是用MapReduce怎么做呢?

代码语言:javascript
复制
1
2
3
4
...
98
99
100

我们需要使用cleanup()方法,这个方法是在map方法执行完之后执行,只执行一次,看源码就明白了。

代码语言:javascript
复制
//一般是啥事都不干,子类可以实现该方法做一些自己的事情
protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}

public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    this.setup(context);
    try {
        while(context.nextKeyValue()) {
            this.map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    } finally {
        //执行完map方法后,执行cleanup()方法
        this.cleanup(context);
    }
}

那么问题就很简单了,Mapper实现代码如下:

代码语言:javascript
复制
public class SumMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
    private long sum = 0L;
    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        long val = Long.parseLong(value.toString());
        sum += val;
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(sum), NullWritable.get());
    }
}

Reduce实现代码如下:

代码语言:javascript
复制
public class SumReduce extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {

    private long sum = 0L;

    @Override
    protected void reduce(LongWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        sum += key.get();
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(sum), NullWritable.get());
    }
}

Main方法入口:

代码语言:javascript
复制
public class SumMain {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(SumMain.class);
        job.setMapperClass(SumMapper.class);
        job.setReducerClass(SumReduce.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

打包成jar包,上传到服务器,然后把包含1到100文本上传到HDFS,执行命令跑任务:

代码语言:javascript
复制
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar SumMain sum_main.txt sum_main_out

输出结果如下:

代码语言:javascript
复制
5050

求平均数

求平均数也是很常见的操作,比如有一大堆随机生成的数字,求出平均数:

代码语言:javascript
复制
10
25
22
78
119
88
56
32
29
25

求平均数的思路其实就是总和除以个数,所以Mapper阶段的输出就是<key,1>,代码如下:

代码语言:javascript
复制
public class AverageMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(Long.parseLong(value.toString())), new IntWritable(1));
    }
}

第二步Reduce步骤就利用cleanup()计算平均数,计算前先计数,求和,代码如下:

代码语言:javascript
复制
public class AverageReduce extends Reducer<LongWritable, IntWritable, Text, NullWritable> {

    private long sum = 0L;

    private long count = 0L;

    @Override
    protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int i = 0;
        for (IntWritable value : values) {
            i += value.get();
        }
        sum += (key.get() * i);
        count += i;
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        BigDecimal sumBigDecimal = new BigDecimal(sum);
        BigDecimal countBigDecimal = new BigDecimal(count);
        BigDecimal result = sumBigDecimal.divide(countBigDecimal, 2, RoundingMode.HALF_UP);
        context.write(new Text(result.toString()), NullWritable.get());
    }
}

入口Main类如下:

代码语言:javascript
复制
public class AverageMain {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(AverageMain.class);
        job.setMapperClass(AverageMapper.class);
        job.setReducerClass(AverageReduce.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

接着还是老套路,打包上传jar包和测试用的文件,接着执行命令跑任务:

代码语言:javascript
复制
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar AverageMain average_main.txt average_main_out

输出结果如下:

代码语言:javascript
复制
48.40

TopK查询

假设下面的文本,是单词以及单词出现的次数,要找出出现次数TOP5的单词,怎么做呢?

代码语言:javascript
复制
c++ 12
redis 45
java 120
Python 50
JavaScript 41
GoLang 30
Spring 30
Mybatis 11
Hibernate 6
RabbitMQ 64
Kafka 78
Nacos 46
SpringCloud 32
MySQL 100
UML 12
Seata 22
ZooKeeper 38

这里我们可以借用TreeMap这个集合的特性,put进treeMap之后会默认从小到大自然排序,然后还提供倒序排序的方法descendingMap()

我写段代码示例一下吧:

代码语言:javascript
复制
public static void main(String[] args) {
    TreeMap<Integer, String> treeMap = new TreeMap<>();
    Random random = new Random();
    for (int i = 0; i < 100; i++) {
        //生成随机数
        int num = random.nextInt(100);
        //插入到treeMap
        treeMap.put(num, num + "");
    }
    for (Integer num : treeMap.keySet()) {
        System.out.println(num);
    }
}

//打印结果
0
2
3
6
8
10
11
12
14
15
...

于是乎我们可以开始写代码,先写Mapper类,比较简单,就是按空格分割一下,然后输出到Reduce。

代码语言:javascript
复制
public class TopMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split(" ");
        String word = split[0];
        long count = Long.parseLong(split[1]);
        context.write(new LongWritable(count), new Text(word));
    }

}

输出到Reduce之后,Reduce这边就需要收集,然后做一些处理,代码如下:

代码语言:javascript
复制
public class TopReduce extends Reducer<LongWritable, Text, Text, NullWritable> {

    private TreeMap<Long, String> treeMap = new TreeMap<>();

    private static final long TOP_K = 5;

    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for (Text value : values) {
            sb.append(value.toString()).append("、");
        }
        //去掉最后一个顿号
        sb.deleteCharAt(sb.lastIndexOf("、"));
        treeMap.put(key.get(), sb.toString());
        //如果大于最大长度,则删掉第一个元素,因为第一个元素是最小的
        if (treeMap.size() > TOP_K) {
            treeMap.remove(treeMap.firstKey());
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        //倒序
        Map<Long, String> navigableMap = treeMap.descendingMap();
        //排名
        int i = 1;
        String s;
        for (Map.Entry<Long, String> entry : navigableMap.entrySet()) {
            s = "排名第" + i + "位 " + entry.getValue() + "出现次数" + entry.getKey() + "次";
            context.write(new Text(s), NullWritable.get());
            i++;
        }
    }
    
}

最后再整个入口类Main。

代码语言:javascript
复制
public class TopMain {
    public static void main(String[] args) throws Exception{
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(TopMain.class);
        job.setMapperClass(TopMapper.class);
        job.setReducerClass(TopReduce.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

大功告成,然后打包上服务器,并且把测试用的文件也上传到服务器,接着执行命令跑任务:

代码语言:javascript
复制
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar TopMain top_k_main.txt top_k_main_out

输出结果如下:

代码语言:javascript
复制
排名第1位 java出现次数120次
排名第2位 MySQL出现次数100次
排名第3位 Kafka出现次数78次
排名第4位 RabbitMQ出现次数64次
排名第5位 Python出现次数50次

总结

这篇文章主要介绍了排序,去重,求和,求平均数,TopK查询的小例子,可以加深一下对MapReduce的理解,这篇文章就讲到这里了,希望对大家有所帮助。

觉得有用就点个赞吧,你的点赞是我创作的最大动力~

我是一个努力让大家记住的程序员。我们下期再见!!!

能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-03-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java技术爱好者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 写在前面
  • 排序
  • 去重
  • 求和
  • 求平均数
  • TopK查询
  • 总结
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档