前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hadoop基础教程-第6章 MapReduce入门(6.2 解读WordCount)

Hadoop基础教程-第6章 MapReduce入门(6.2 解读WordCount)

作者头像
程裕强
发布2022-05-06 18:43:39
7040
发布2022-05-06 18:43:39
举报
文章被收录于专栏:大数据学习笔记

第6章 MapReduce入门

6.2 解读WordCount

WordCount程序就是MapReduce的HelloWord程序。通过对WordCount程序分析,我们可以了解MapReduce程序的基本结构和执行过程。

6.2.1 WordCount设计思路

WordCount程序很好的体现了MapReduce编程思想。 一般来说,本文作为MapReduce的输入,MapReduce会将文本进行切分处理并将行号作为输入键值对的键,文本内容作为键值对的值,经map方法处理后,输出中间结果为<word,1>形式。MapReduce会默认按键值分发给reduce方法,在完成计数并输出最后结果<word,count>

6.2.2 MapReduce运行方式

MapReduce运行方式分为本地运行和服务端运行两种。 本地运行多指本地Windows环境,方便开发调试。 而服务端运行,多用于实际生产环境。

6.2.3 编写代码

(1)创建Java 项目

(2)修改Hadoop源码 注意,在Windows本地运行MapReduce程序时,需要修改Hadoop源码。如果在Linux服务器运行,则不需要修改Hadoop源码。

修改Hadoop源码,其实就是简单修改一下Hadoop的NativeIO类的源码

下载对应hadoop源代码,hadoop-2.7.3-src.tar.gz解压,hadoop-2.7.3-src\hadoop-common-project\hadoop-common\src\main\java\org\apache\hadoop\io\nativeio下NativeIO.java 复制到对应的Eclipse的project. 修改代码

代码语言:javascript
复制
    public static boolean access(String path, AccessRight desiredAccess)
        throws IOException {
        return true;
        //return access0(path, desiredAccess.accessRight());
    }

如果不修改NativeIO类的源码,在Windows本地运行MapReduce程序会产生异常:

代码语言:javascript
复制
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
    at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:187)
    at org.apache.hadoop.util.DiskChecker.checkDirAccess(DiskChecker.java:174)
    at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:108)
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:285)
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:115)
    at org.apache.hadoop.mapred.LocalDistributedCacheManager.setup(LocalDistributedCacheManager.java:125)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.<init>(LocalJobRunner.java:163)
    at org.apache.hadoop.mapred.LocalJobRunner.submitJob(LocalJobRunner.java:731)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:240)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Unknown Source)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
    at cn.hadron.mr.RunJob.main(RunJob.java:33)

(3)定义Mapper类

代码语言:javascript
复制
package cn.hadron.mr;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
//4个泛型参数:前两个表示map的输入键值对的key和value的类型,后两个表示输出键值对的key和value的类型
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    //该方法循环调用,从文件的split中读取每行调用一次,把该行所在的下标为key,该行的内容为value
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        String[] words = StringUtils.split(value.toString(), ' ');
        for(String w :words){
            context.write(new Text(w), new IntWritable(1));
        }
    }
}

代码说明:

  • Mapper类用于读取数据输入并执行map方法,编写Mapper类需要继承org.apache.hadoop.mapreduce.Mapper类,并且根据相应问题实现map方法。
  • Mapper类的4个泛型参数:前两个表示map的输入键值对的key和value的类型,后两个表示输出键值对的key和value的类型
  • MapReduce计算框架会将键值对作为参数传递给map方法。该方法有3个参数,第1个是Object类型(一般使用LongWritable类型)参数,代表行号,第2个是Object类型参数(一般使用Text类型),代表该行内容,第3个Context参数,代表上下文。
  • Context类全名是org.apache.hadoop.mapreduce.Mapper.Context,也就是说Context类是Mapper类的静态内容类,在Mapper类中可以直接使用Context类。
  • 在map方法中使用StringUtils的split方法,按空格将输入行内容分割成单词,然后通过Context类的write方法将其作为中间结果输出。

(4)定义Reducer类

代码语言:javascript
复制
package cn.hadron.mr;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    /**
     * Map过程输出<key,values>中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,
     * 每组调用一次,这一组数据特点:key相同,value可能有多个。
     * /所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。
     */
    protected void reduce(Text key, Iterable<IntWritable> values,Context context)
            throws IOException, InterruptedException {
        int sum =0;
        for(IntWritable i: values){
            sum=sum+i.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

代码说明:

  • Reducer类用于接收Mapper输出的中间结果作为Reducer类的输入,并执行reduce方法。
  • Reducer类的4个泛型参数:前2个代表reduce方法输入的键值对类型(对应map输出类型),后2个代表reduce方法输出键值对的类型
  • reduce方法参数:key是单个单词,values是对应单词的计数值所组成的列表,Context类型是org.apache.hadoop.mapreduce.Reducer.Context,是Reducer的上下文。

(6)定义主方法(主类)

代码语言:javascript
复制
package cn.hadron.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {

    public static void main(String[] args) {
        //设置环境变量HADOOP_USER_NAME,其值是root
        System.setProperty("HADOOP_USER_NAME", "root");
        //Configuration类包含了Hadoop的配置
        Configuration config =new Configuration();
        //设置fs.defaultFS
        config.set("fs.defaultFS", "hdfs://192.168.80.131:9000");
        //设置yarn.resourcemanager节点
        config.set("yarn.resourcemanager.hostname", "node1");
        try {
            FileSystem fs =FileSystem.get(config);
            Job job =Job.getInstance(config);
            job.setJarByClass(RunJob.class);
            job.setJobName("wc");
            //设置Mapper类
            job.setMapperClass(WordCountMapper.class);
            //设置Reduce类
            job.setReducerClass(WordCountReducer.class);
            //设置reduce方法输出key的类型
            job.setOutputKeyClass(Text.class);
            //设置reduce方法输出value的类型
            job.setOutputValueClass(IntWritable.class);
            //指定输入路径
            FileInputFormat.addInputPath(job, new Path("/user/root/input/"));
            //指定输出路径(会自动创建)
            Path outpath =new Path("/user/root/output/");
            //输出路径是MapReduce自动创建的,如果存在则需要先删除
            if(fs.exists(outpath)){
                fs.delete(outpath, true);
            }
            FileOutputFormat.setOutputPath(job, outpath);
            //提交任务,等待执行完成
            boolean f= job.waitForCompletion(true);
            if(f){
                System.out.println("job任务执行成功");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

(6)本地运行

执行结果:

代码语言:javascript
复制
[root@node1 ~]# hdfs dfs -ls /user/root/output
Found 2 items
-rw-r--r--   3 root supergroup          0 2017-05-28 09:01 /user/root/output/_SUCCESS
-rw-r--r--   3 root supergroup         46 2017-05-28 09:01 /user/root/output/part-r-00000
[root@node1 ~]# hdfs dfs -cat /user/root/output/part-r-00000
Hadoop  2
Hello   2
Hi      1
Java    2
World   1
world   1
[root@node1 ~]#

6.2.4 服务端运行

(1)修改源码

上面代码中的主方法是根据本地运行设计的,如果要在服务器端运行,可以适当简化。 参照官方源码 http://hadoop.apache.org/docs/r2.7.3/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

将Mapper类和Reducer类写成主类的静态内部类

代码语言:javascript
复制
package cn.hadron.mr;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
    //4种形式的参数,分别用来指定map的输入key值类型、输入value值类型、输出key值类型和输出value值类型
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        //map方法中value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            //StringTokenizer类将每一行拆分成为一个个的单词,并将<word,1>作为map方法的结果输出
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        //Map过程输出<key,values>中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,
        //所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
    //执行MapReduce任务
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "wordCount");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //命令行输入的第一个参数是输入路径,第二个参数是输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

(2)导出jar包

(3)上传到服务器端运行 和前面一样,通过xftp将刚刚导出到桌面的wordcount.jar包上传到node1节点

代码语言:javascript
复制
[root@node1 ~]# hadoop jar wordcount.jar cn.hadron.mr.WordCount input output
17/05/28 10:41:41 INFO client.RMProxy: Connecting to ResourceManager at node1/192.168.80.131:8032
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://node1:9000/user/root/output already exists
    at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
    at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:266)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:139)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
    at cn.hadron.mr.WordCount.main(WordCount.java:59)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

这是由于output目录已经存在,删除即可

代码语言:javascript
复制
[root@node1 ~]# hdfs dfs -rmr /user/root/output
rmr: DEPRECATED: Please use 'rm -r' instead.
17/05/28 10:42:01 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/root/output

重新运行

代码语言:javascript
复制
[root@node1 ~]# hadoop jar wordcount.jar cn.hadron.mr.WordCount input output
17/05/28 10:43:12 INFO client.RMProxy: Connecting to ResourceManager at node1/192.168.80.131:8032
17/05/28 10:43:14 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/05/28 10:43:15 INFO input.FileInputFormat: Total input paths to process : 2
17/05/28 10:43:15 INFO mapreduce.JobSubmitter: number of splits:2
17/05/28 10:43:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1495804618534_0001
17/05/28 10:43:17 INFO impl.YarnClientImpl: Submitted application application_1495804618534_0001
17/05/28 10:43:17 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1495804618534_0001/
17/05/28 10:43:17 INFO mapreduce.Job: Running job: job_1495804618534_0001
17/05/28 10:43:43 INFO mapreduce.Job: Job job_1495804618534_0001 running in uber mode : false
17/05/28 10:43:43 INFO mapreduce.Job:  map 0% reduce 0%
17/05/28 10:44:19 INFO mapreduce.Job:  map 100% reduce 0%
17/05/28 10:44:33 INFO mapreduce.Job:  map 100% reduce 100%
17/05/28 10:44:35 INFO mapreduce.Job: Job job_1495804618534_0001 completed successfully
17/05/28 10:44:36 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=89
        FILE: Number of bytes written=355368
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=301
        HDFS: Number of bytes written=46
        HDFS: Number of read operations=9
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Killed map tasks=1
        Launched map tasks=2
        Launched reduce tasks=1
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=62884
        Total time spent by all reduces in occupied slots (ms)=12445
        Total time spent by all map tasks (ms)=62884
        Total time spent by all reduce tasks (ms)=12445
        Total vcore-milliseconds taken by all map tasks=62884
        Total vcore-milliseconds taken by all reduce tasks=12445
        Total megabyte-milliseconds taken by all map tasks=64393216
        Total megabyte-milliseconds taken by all reduce tasks=12743680
    Map-Reduce Framework
        Map input records=6
        Map output records=14
        Map output bytes=140
        Map output materialized bytes=95
        Input split bytes=216
        Combine input records=14
        Combine output records=7
        Reduce input groups=6
        Reduce shuffle bytes=95
        Reduce input records=7
        Reduce output records=6
        Spilled Records=14
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=860
        CPU time spent (ms)=10230
        Physical memory (bytes) snapshot=503312384
        Virtual memory (bytes) snapshot=6236766208
        Total committed heap usage (bytes)=301146112
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=85
    File Output Format Counters 
        Bytes Written=46
[root@node1 ~]# 

查看结果

代码语言:javascript
复制
[root@node1 ~]# hdfs dfs -ls /user/root/output
Found 2 items
-rw-r--r--   3 root supergroup          0 2017-05-28 10:44 /user/root/output/_SUCCESS
-rw-r--r--   3 root supergroup         46 2017-05-28 10:44 /user/root/output/part-r-00000
[root@node1 ~]# hdfs dfs -cat /user/root/output/part-r-00000
Hadoop  2
Hello   2
Hi      1
Java    2
World   1
world   1

问题补充

2017-06-24 今天再次运行之前写的MapReduce程序时,报错:

代码语言:javascript
复制
(null) entry in command string: null chmod 0700

解决办法: (1)下载hadoop-2.7.3.tar.gz,解压缩。比如解压缩到D盘,hadoop根目录就是D:\hadoop-2.7.3 (2)拷贝debug工具(winutils.exe)到HADOOP_HOME/bin

(3)设置环境变量

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017-05-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 第6章 MapReduce入门
    • 6.2 解读WordCount
      • 6.2.1 WordCount设计思路
      • 6.2.2 MapReduce运行方式
      • 6.2.3 编写代码
      • 6.2.4 服务端运行
    • 问题补充
    相关产品与服务
    云服务器
    云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档