首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
Hadoop面试复习系列——HDFS(一)
2
大数据技术之_04_Hadoop学习_01_HDFS_HDFS概述+HDFS的Shell操作(开发重点)+HDFS客户端操作(开发重点)+HDFS的数据流(面试重点)+NameNode和Seconda
3
大数据技术之_05_Hadoop学习_02_MapReduce_MapReduce框架原理+InputFormat数据输入+MapReduce工作流程(面试重点)+Shuffle机制(面试重点)
4
大数据技术之_05_Hadoop学习_01_MapReduce_MapReduce概述+Hadoop序列化
5
大数据技术之_03_Hadoop学习_01_入门_大数据概论+从Hadoop框架讨论大数据生态+Hadoop运行环境搭建(开发重点)
6
大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客
7
大数据技术之_06_Zookeeper学习_Zookeeper入门+Zookeeper安装+Zookeeper内部原理+Zookeeper实战(开发重点)+企业面试真题
8
大数据技术之_09_Hive学习_复习与总结
9
大数据技术之_07_Hadoop学习_HDFS_HA(高可用)_HA概述+HDFS-HA工作机制+HDFS-HA集群配置+YARN-HA配置+HDFS Federation(联邦) 架构设计
10
大数据技术之_08_Hive学习_01_Hive入门+Hive安装、配置和使用+Hive数据类型
11
大数据技术之_08_Hive学习_04_压缩和存储(Hive高级)+ 企业级调优(Hive优化)
12
大数据技术之_08_Hive学习_05_Hive实战之谷粒影音(ETL+TopN)+常见错误及解决方案
13
大数据技术之_08_Hive学习_02_DDL数据定义(创建/查询/修改/删除数据库+创建表+分区表+修改表+删除表)+DML数据操作(数据导入+数据导出+清除表中数据)
14
大数据技术之_08_Hive学习_03_查询+函数
15
大数据技术之_16_Scala学习_09_函数式编程-高级
16
大数据技术之_09_Flume学习_Flume概述+Flume快速入门+Flume企业开发案例+Flume监控之Ganglia+Flume高级之自定义MySQLSource+Flume企业真实面试题(
17
大数据技术之_13_Azkaban学习_Azkaban(阿兹卡班)介绍 + Azkaban 安装部署 + Azkaban 实战
18
大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例
19
大数据技术之_12_Sqoop学习_Sqoop 简介+Sqoop 原理+Sqoop 安装+Sqoop 的简单使用案例+Sqoop 一些常用命令及参数
20
大数据技术之_16_Scala学习_13_Scala语言的数据结构和算法_Scala学习之旅收官之作
21
大数据技术之_19_Spark学习_06_Spark 源码解析 + Spark 通信架构、脚本解析、standalone 模式启动、提交流程 + Spark Shuffle 过程 + Spark 内存
22
大数据技术之_16_Scala学习_04_函数式编程-基础+面向对象编程-基础
23
大数据技术之_14_Oozie学习
24
大数据技术之_26_交通状态预测项目_01
25
大数据技术之_16_Scala学习_02_变量
26
大数据技术之_16_Scala学习_07_数据结构(上)-集合
27
大数据技术之_28_电商推荐系统项目_01
28
大数据技术之_28_电商推荐系统项目_02
29
大数据技术之_18_大数据离线平台_04_数据分析 + Hive 之 hourly 分析 + 常用 Maven 仓库地址
30
大数据技术之_16_Scala学习_01_Scala 语言概述
31
大数据技术之_29_MySQL 高級面试重点串讲_02
32
大数据技术之_18_大数据离线平台_05_离线平台项目模块小结
33
大数据技术之_19_Spark学习_06_Spark 源码解析小结
34
大数据技术之_16_Scala学习_05_面向对象编程-中级
35
大数据技术之_16_Scala学习_08_数据结构(下)-集合操作+模式匹配
36
大数据技术之_24_电影推荐系统项目_05_项目系统设计
37
大数据技术之_19_Spark学习_03_Spark SQL 应用解析小结
38
大数据技术之_19_Spark学习_07_Spark 性能调优小结
39
大数据技术之_19_Spark学习_05_Spark GraphX 应用解析小结
40
大数据技术之_19_Spark学习_02_Spark Core 应用解析小结
41
大数据技术之_24_电影推荐系统项目_08_项目总结及补充
42
大数据技术之_19_Spark学习_01_Spark 基础解析小结(无图片)
43
大数据技术之_18_大数据离线平台_03_数据处理+工具代码导入+业务 ETL 实现+创建数据库表
44
大数据技术之_24_电影推荐系统项目_02_Python 基础语法复习
45
大数据技术之_24_电影推荐系统项目_06_项目体系架构设计 + 工具环境搭建 + 创建项目并初始化业务数据 + 离线推荐服务建设 + 实时推荐服务建设 + 基于内容的推荐服务建设
46
大数据技术之_27_电商平台数据分析项目_01_大数据的框架回顾 + 大数据的企业应用
47
大数据技术之_23_Python核心基础学习_03_函数 + 对象(12.5小时)
48
大数据技术之_32_大数据面试题_01_Hive 基本面试 + Hive 数据分析面试 + Flume + Kafka 面试
49
大数据技术之_23_Python核心基础学习_04_ 异常 + 文件(3.5小时)
50
大数据技术之_16_Scala学习_03_运算符+程序流程控制
清单首页hadoop文章详情

大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客


第6章 Hadoop企业优化(重中之重)

6.1 MapReduce 跑的慢的原因

6.2 MapReduce优化方法

  MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。

6.2.1 数据输入

6.2.2 Map阶段

6.2.3 Reduce阶段

6.2.4 I/O传输

6.2.5 数据倾斜问题

6.2.6 常用的调优参数

1、资源相关参数 (1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml)

(2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)

(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)

2、容错相关参数(MapReduce性能优化)

6.3 HDFS小文件优化方法

6.3.1 HDFS小文件弊端

  HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢

6.3.2 HDFS小文件解决方案

小文件的优化无非以下几种方式:   (1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。   (2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。   (3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。

第7章 MapReduce扩展案例

7.1 倒排索引案例(多job串联)

1、需求   有大量的文本(文档、网页),需要建立搜索索引,如下图所示。 (1)数据输入

(2)期望输出数据

代码语言:javascript
复制
atguigu    c.txt-->2   b.txt-->2   a.txt-->3   
pingping    c.txt-->1   b.txt-->3   a.txt-->1   
ss    c.txt-->1   b.txt-->1   a.txt-->2   

2、需求分析

3、第一次处理 (1)第一次处理,编写OneIndexMapper类

代码语言:javascript
复制
package com.atguigu.mr.index;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class OneIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    String name;
    Text k = new Text();
    IntWritable v = new IntWritable();

    @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {
        // 获取文件名称
        FileSplit split = (FileSplit) context.getInputSplit();
        name = split.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        // atguigu pingping

        // 1、获取一行数据
        String line = value.toString();

        // 2、切割
        String[] fields = line.split(" ");

        for (String word : fields) {
            // 3、拼接
            k.set(word + "---" + name); // atguigu---a.txt
            v.set(1);
            // 4、写出
            context.write(k, v); // <atguigu---a.txt,1>
        }
    }
}

(2)第一次处理,编写OneIndexReducer类

代码语言:javascript
复制
package com.atguigu.mr.index;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class OneIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {

        // 1、累加求和
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }

        v.set(sum);

        // 2、写出
        context.write(key, v);
    }
}

(3)第一次处理,编写OneIndexDriver类

代码语言:javascript
复制
package com.atguigu.mr.index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OneIndexDriver {

    public static void main(String[] args) throws Exception {

        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[] { "d:/temp/atguigu/0529/input/inputoneindex", "d:/temp/atguigu/0529/output17" };

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);
        job.setJarByClass(OneIndexDriver.class);

        job.setMapperClass(OneIndexMapper.class);
        job.setReducerClass(OneIndexReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

(4)查看第一次输出结果

代码语言:javascript
复制
atguigu---a.txt    3
atguigu---b.txt    2
atguigu---c.txt    2
pingping---a.txt    1
pingping---b.txt    3
pingping---c.txt    1
ss---a.txt    2
ss---b.txt    1
ss---c.txt    1

4、第二次处理 (1)第二次处理,编写TwoIndexMapper类

代码语言:javascript
复制
package com.atguigu.mr.index;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TwoIndexMapper extends Mapper<LongWritable, Text, Text, Text> {

    Text k = new Text();
    Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 输入为:
        // atguigu--a.txt   3
        // atguigu--b.txt   2
        // atguigu--c.txt   2
        // 输出为:(atguigu,a.txt   3)atguigu   c.txt-->2   b.txt-->2   a.txt-->3

        // 1、获取一行数据
        String line = value.toString();

        // 2、用“--”切割
        String[] fields = line.split("--"); // 结果为:(atguigu,a.txt   3)

        // 3、封装数据
        k.set(fields[0]);
        v.set(fields[1]);

        // 4、写出
        context.write(k, v);
    }
}

(2)第二次处理,编写TwoIndexReducer类

代码语言:javascript
复制
package com.atguigu.mr.index;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TwoIndexReducer extends Reducer<Text, Text, Text, Text> {

    Text v = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        // 输入为:(atguigu,a.txt   3)(atguigu,b.txt    2)(atguigu,c.txt    2)
        // 输出为:atguigu  c.txt-->2   b.txt-->2   a.txt-->3

        StringBuffer sb = new StringBuffer();

        // 拼接
        for (Text value : values) {
            sb.append(value.toString().replace("\t", "-->") + "\t");
        }

        // 封装
        v.set(sb.toString());

        // 写出
        context.write(key, v);
    }
}

(3)第二次处理,编写TwoIndexDriver类

代码语言:javascript
复制
package com.atguigu.mr.index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TwoIndexDriver {

    public static void main(String[] args) throws Exception {

        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[] { "d:/temp/atguigu/0529/input/inputtowindex", "d:/temp/atguigu/0529/output18" };

        Configuration config = new Configuration();
        Job job = Job.getInstance(config);

        job.setJarByClass(TwoIndexDriver.class);
        job.setMapperClass(TwoIndexMapper.class);
        job.setReducerClass(TwoIndexReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

(4)第二次查看最终结果

代码语言:javascript
复制
atguigu    c.txt-->2   b.txt-->2   a.txt-->3
pingping    c.txt-->1   b.txt-->3   a.txt-->1
ss    c.txt-->1   b.txt-->1   a.txt-->2

7.2 TopN案例

1、需求   对需求2.3输出结果进行加工,输出流量使用量在前10的用户信息。 (1)输入数据 (2)输出数据

2、需求分析   同上图。 3、实现代码 (1)编写FlowBean类

代码语言:javascript
复制
package com.atguigu.mr.topn;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow; // 上行流量
    private long downFlow; // 下行流量
    private long sumFlow; // 总流量

    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public void set(long downFlow2, long upFlow2) {
        downFlow = downFlow2;
        upFlow = upFlow2;
        sumFlow = downFlow2 + upFlow2;
    }

    @Override
    public int compareTo(FlowBean bean) {

        int result;

        // 按照总流量大小,倒序排列
        if (this.sumFlow > bean.getSumFlow()) {
            result = -1;
        } else if (this.sumFlow < bean.getSumFlow()) {
            result = 1;
        } else {
            result = 0;
        }

        return result;
    }
}

(2)编写TopNMapper类

代码语言:javascript
复制
package com.atguigu.mr.topn;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TopNMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

    // 定义一个TreeMap作为存储数据的容器(天然按key排序,降序)
    private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();
    private FlowBean kBean;

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        kBean = new FlowBean();
        Text v = new Text();

        // 13470253144  180 180 360

        // 1、获取一行
        String line = value.toString();

        // 2、切割
        String[] fields = line.split("\t");

        // 3、封装数据
        String phoneNum = fields[0];
        long upFlow = Long.parseLong(fields[1]);
        long downFlow = Long.parseLong(fields[2]);
        long sumFlow = Long.parseLong(fields[3]);

        kBean.setUpFlow(upFlow);
        kBean.setDownFlow(downFlow);
        kBean.setSumFlow(sumFlow);

        v.set(phoneNum);

        // 4、向TreeMap中添加数据
        flowMap.put(kBean, v);

        // 5、限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据
        if (flowMap.size() > 10) {
            // flowMap.remove(flowMap.firstKey()); // 升序删除第一个
            flowMap.remove(flowMap.lastKey()); // 降序删除最后一个
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {

        // 6、遍历TreeMap集合,输出数据
        Iterator<FlowBean> bean = flowMap.keySet().iterator();

        while (bean.hasNext()) {
            FlowBean k = bean.next();
            context.write(k, flowMap.get(k));
        }
    }
}

(3)编写TopNReducer类

代码语言:javascript
复制
package com.atguigu.mr.topn;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TopNReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    // 定义一个TreeMap作为存储数据的容器(天然按key排序)
    TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        for (Text value : values) {
            FlowBean bean = new FlowBean();
            bean.set(key.getDownFlow(), key.getUpFlow());

            // 1、向treeMap集合中添加数据
            flowMap.put(bean, new Text(value));

            // 2、限制TreeMap数据量,超过10条就删除掉流量最小的一条数据
            if (flowMap.size() > 10) {
                // flowMap.remove(flowMap.firstKey()); // 升序删除第一个
                flowMap.remove(flowMap.lastKey()); // 降序删除最后一个
            }
        }
    }

    @Override
    protected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {

        // 3、遍历集合,输出数据
        Iterator<FlowBean> bean = flowMap.keySet().iterator();

        while (bean.hasNext()) {
            FlowBean v = bean.next();
            context.write(new Text(flowMap.get(v)), v);
        }
    }
}

(4)编写TopNDriver类

代码语言:javascript
复制
package com.atguigu.mr.topn;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TopNReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    // 定义一个TreeMap作为存储数据的容器(天然按key排序)
    TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        for (Text value : values) {
            FlowBean bean = new FlowBean();
            bean.set(key.getDownFlow(), key.getUpFlow());

            // 1、向treeMap集合中添加数据
            flowMap.put(bean, new Text(value));

            // 2、限制TreeMap数据量,超过10条就删除掉流量最小的一条数据
            if (flowMap.size() > 10) {
                // flowMap.remove(flowMap.firstKey()); // 升序
                flowMap.remove(flowMap.lastKey()); // 降序
            }
        }
    }

    @Override
    protected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {

        // 3、遍历集合,输出数据
        Iterator<FlowBean> it = flowMap.keySet().iterator();

        while (it.hasNext()) {
            FlowBean v = it.next();
            context.write(new Text(flowMap.get(v)), v);
        }
    }
}

7.3 找博客共同粉丝案例

1、需求   以下是博客的粉丝列表数据,冒号前是一个用户,冒号后是该用户的所有粉丝(数据中的粉丝关系是单向的)   求出哪些人两两之间有共同粉丝,及他俩的共同粉丝都有谁? (1)数据输入

代码语言:javascript
复制
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

2、需求分析 先求出A、B、C、……等是谁的粉丝 第一次输出结果

代码语言:javascript
复制
A    I,K,C,B,G,F,H,O,D,
B    A,F,J,E,
C    A,E,B,H,F,G,K,
D    G,C,K,A,L,F,E,H,
E    G,M,L,H,A,F,B,D,
F    L,M,D,C,G,A,
G    M,
H    O,
I    O,C,
J    O,
K    B,
L    D,E,
M    E,F,
O    A,H,I,J,F,

第二次输出结果

代码语言:javascript
复制
A-B    E C 
A-C    D F 
A-D    E F 
A-E    D B C 
A-F    O B C D E 
A-G    F E C D 
A-H    E C D O 
A-I    O 
A-J    O B 
A-K    D C 
A-L    F E D 
A-M    E F 
B-C    A 
B-D    A E 
B-E    C 
B-F    E A C 
B-G    C E A 
B-H    A E C 
B-I    A 
B-K    C A 
B-L    E 
B-M    E 
B-O    A 
C-D    A F 
C-E    D 
C-F    D A 
C-G    D F A 
C-H    D A 
C-I    A 
C-K    A D 
C-L    D F 
C-M    F 
C-O    I A 
D-E    L 
D-F    A E 
D-G    E A F 
D-H    A E 
D-I    A 
D-K    A 
D-L    E F 
D-M    F E 
D-O    A 
E-F    D M C B 
E-G    C D 
E-H    C D 
E-J    B 
E-K    C D 
E-L    D 
F-G    D C A E 
F-H    A D O E C 
F-I    O A 
F-J    B O 
F-K    D C A 
F-L    E D 
F-M    E 
F-O    A 
G-H    D C E A 
G-I    A 
G-K    D A C 
G-L    D F E 
G-M    E F 
G-O    A 
H-I    O A 
H-J    O 
H-K    A C D 
H-L    D E 
H-M    E 
H-O    A 
I-J    O 
I-K    A 
I-O    A 
K-L    D 
K-O    A 
L-M    E F

3、代码实现 (1)第一次Mapper类

代码语言:javascript
复制
package com.atguigu.mr.friends;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{

    Text k = new Text();
    Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // A:B,C,D,F,E,O

        // 1、获取一行
        String line = value.toString();

        // 2、切割
        String[] fields = line.split(":");

        // 3、获取用户和用户的粉丝
        String user = fields[0]; // person = A
        String[] friends = fields[1].split(","); // firends = [B, C, D, F, E, O]

        // 封装
        v.set(user);

        // 4、写出去
        for (String friend : friends) {
            k.set(friend);
            context.write(k, v); // <粉丝,用户>  <B,A><C,A><D,A>
        }
    }
}

(2)第一次Reducer类

代码语言:javascript
复制
package com.atguigu.mr.friends;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text> {

    Text v = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        StringBuffer sb = new StringBuffer();

        // <B,A><C,A><D,A>
        // 1、拼接
        for (Text user : values) {
            sb.append(user).append(","); // 
        }

        v.set(sb.toString());

        // 2、写出
        context.write(key, v); // A I,K,C,B,G,F,H,O,D,
    }
}

(3)第一次Driver类

代码语言:javascript
复制
package com.atguigu.mr.friends;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OneShareFriendsDriver {
    public static void main(String[] args) throws Exception {

        // 0、根据自己电脑路径重新配置
        args = new String[] { "d:/temp/atguigu/0529/input/inputfriend", "d:/temp/atguigu/0529/output21" };

        // 1、获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2、指定jar包运行的路径
        job.setJarByClass(OneShareFriendsDriver.class);

        // 3、指定map/reduce使用的类
        job.setMapperClass(OneShareFriendsMapper.class);
        job.setReducerClass(OneShareFriendsReducer.class);

        // 4、指定map输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // 5、指定最终输出的数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 6、指定job的输入原始所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7、提交
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}

(4)第二次Mapper类

代码语言:javascript
复制
package com.atguigu.mr.friends;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text> {



    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // A   I,K,C,B,G,F,H,O,D,
        // 粉丝    用户,用户,用户

        // 1、获取一行
        String line = value.toString();

        // 2、切割
        String[] friend_users = line.split("\t");

        // A
        String friend = friend_users[0];
        // I,K,C,B,G,F,H,O,D,
        String[] users = friend_users[1].split(",");

        Arrays.sort(users); // B,C,D,F,G,H,I,K,O

        for (int i = 0; i < users.length - 1; i++) {
            for (int j = i + 1; j < users.length; j++) {
                context.write(new Text(users[i] + "-" + users[j]), new Text(friend));
            }
        }
    }
}

(5)第二次Reducer类

代码语言:javascript
复制
package com.atguigu.mr.friends;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values,Context context)
            throws IOException, InterruptedException {

        StringBuffer sb = new StringBuffer();

        for (Text friend : values) {
            sb.append(friend).append(" ");
        }

        context.write(key, new Text(sb.toString()));
    }
}

(6)第二次Driver类

代码语言:javascript
复制
package com.atguigu.mr.friends;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TwoShareFriendsDriver {
    public static void main(String[] args) throws Exception {

        // 0、根据自己电脑路径重新配置
        args = new String[] { "d:/temp/atguigu/0529/input/inputfriends", "d:/temp/atguigu/0529/output22" };

        // 1、获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2、指定jar包运行的路径
        job.setJarByClass(TwoShareFriendsDriver.class);

        // 3、指定map/reduce使用的类
        job.setMapperClass(TwoShareFriendsMapper.class);
        job.setReducerClass(TwoShareFriendsReducer.class);

        // 4、指定map输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //
        // 5、指定最终输出的数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 6、指定job的输入原始所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7、提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

第8章 常见错误及解决方案

1)导包容易出错。尤其Text和CombineTextInputFormat。 2)Mapper中第一个输入的参数必须是LongWritable或者NullWritable,不可以是IntWritable,报的错误是类型转换异常。 3)java.lang.Exception: java.io.IOException: Illegal partition for 13926435656(4),说明Partition和ReduceTask个数没对上,调整ReduceTask个数。 4)如果分区数不是1,但是reducetask为1,是否执行分区过程。   答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。 5)在Windows环境编译的jar包导入到Linux环境中运行:

代码语言:javascript
复制
hadoop jar wc.jar com.atguigu.mapreduce.wordcount.WordCountDriver /user/atguigu/ /user/atguigu/output

报如下错误:

代码语言:javascript
复制
Exception in thread "main" java.lang.UnsupportedClassVersionError: com/atguigu/mapreduce/wordcount/WordCountDriver : Unsupported major.minor version 52.0

  原因是Windows环境用的jdk1.7,Linux环境用的jdk1.8。   解决方案:统一jdk版本。 6)缓存pd.txt小文件案例中,报找不到pd.txt文件   原因:大部分为路径书写错误。还有就是要检查pd.txt.txt的问题。还有个别电脑写相对路径找不到pd.txt,可以修改为绝对路径。 7)报类型转换异常。   通常都是在驱动函数中设置Map输出和最终输出时编写错误。   Map输出的key如果没有排序,也会报类型转换异常。 8)集群中运行wc.jar时出现了无法获得输入文件。   原因:WordCount案例的输入文件不能放用HDFS集群的根目录。 9)出现了如下相关异常

代码语言:javascript
复制
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:356)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:371)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:364)

解决方案一:拷贝hadoop.dll文件(文件位置:D:\work\Hadoop\hadoop-2.7.2\bin)到Windows目录C:\Windows\System32。个别同学电脑还需要修改Hadoop源码。 解决方案二:创建如下包名,并将NativeIO.java拷贝到该包名下

10)自定义Outputformat时,注意在RecordWirter中的close()方法必须关闭流资源。否则输出的文件内容中数据为空。

代码语言:javascript
复制
    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        if (atguigufos != null) {
            atguigufos.close();
        }
        if (otherfos != null) {
            otherfos.close();
        }
    }
下一篇
举报
领券