前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >解析一下WordCount项目

解析一下WordCount项目

作者头像
可爱见见
发布2019-09-09 16:27:53
4400
发布2019-09-09 16:27:53
举报
文章被收录于专栏:卡尼慕

那么我们先来看看这个项目是干什么的。

假设这里有一份文本,里面写了很多单词,当然要有分隔的输入,如下图:

我们需要统计文本中每个单词出现的次数,我们看看怎么完成的。

1

那么我们先来简单地看看项目逻辑,他到底是怎么完成简单的单词计数的。看图。

很显然,首先我们拿到文本不是立刻就送去MapReduce中处理,而是先通过一个叫做TextInputFormat的类,处理好原有文本的数据,用偏移量逐个表识。然后再传入map中处理。然而map只是对单词进行简单的编号(同时编上1),在再进入reduce类之前,先通过迭代器(图中黑色部分,等会会补上),把map的数据简单的处理,如上图的hadoop为例,迭代器的内容就是<hadoop,iterator(1,1)>,那么在reduce类中,我们只要统计好迭代器中的数值就好了。

2

那我们开始写自己的map类。上代码。

代码语言:javascript
复制
//1、自己的map类
    //继承Mapper类,<KEYIN, VALUEIN, KEYOUT, VALUEOUT>   输入的key,输入的value,输出的key,输出的value
    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        //创建一个IntWritable类型的对象,给定值为1
        IntWritable i = new IntWritable(1);
        Text keystr  = new Text();
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String line = value.toString();
            //传入每一个map方法的key和value做打印
            System.out.println("key : "+key.get()+"--------- value : "+line);
            String [] strs = line.split(" ");
            for (String str : strs) {
                //每一次循环遍历到一个单词就要输出到下一个步骤
                keystr.set(str);
                System.out.println("map的输出:key : ("+str+",1)");
                context.write(keystr, i);
            }

    /*      StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    keystr.set(itr.nextToken());
                   context.write(keystr, i);
                  }*/
        }
    }

这里要给大家普及一下Hadoop的数据类型

1、IntWritable

2、ByteWritable

3、FloatWritable

4、LongWritable

5、DoubleWritable

6、NullWritable

7、Text (等于常用的String)

这些就不解释了,看看名字就知道了吧。

来看看代码。首先这个MyMapper类要继承Mapper类,这个类后面的参数是什么呢?<LongWritable, Text, Text, IntWritable>,为什么要这样写呢。

实际上这四个参数分别对应<KEYIN, VALUEIN, KEYOUT, VALUEOUT>,也就是输入的key,输入的value,输出的key,输出的value

很简单,我们来看看上面的代码逻辑,在传入map类之前是不是已经经过了一个叫做TextInputFormat的东西,他把我们的文本变成了偏移量+单词,这就很好的对应了,这里的偏移量(LongWritable)作为KETIN,单词(Text)作为VALUEIN。那么输出到reduce的结果是怎样的呢?是单词+编号(1),因此单词(Text)作为KETOUT,编号(IntWritable)作为VALUEOUT

然后我们使用的是Mapper类的map方法。这里要注意以下,使用Context来把map中的结果传给reduce,但是要注意contect的使用方法

我们先把字符串切割成我们需要的数组,就是把空格给切割掉,然后循环遍历数组,每次循环遍历到一个单词就要输出到下个步骤,也就是使用刚才讲的context,但是这里要注意,context是不接受String类型的,因此这里要提前创建好Text类的对象,在每次循环的时候使用set方法构建Text,作为context的参数,同样,Int类型也要使用IntWritable。提一句,这里的循环遍历有比较多的方式,这里的话采用的是foreach的方式,当然效率不高,也可以使用他们已经写好的方式稍微修改也可以,这边方便理解就使用foreach了。

3

好了,我们已经写完了MyMapper,现在开始写MyReducer类,上代码。

代码语言:javascript
复制
//2自己的reduce类
    //  reduce类的输入,其实就是map类中map方法的输出            输入key  输入value  输出key  输出value
    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
            IntWritable countwritable = new IntWritable();
        @Override
        //Map类的map方法的数据输入到Reduce类的group方法中,得到<text,it(1,1)>,再将这个数据输入reduce类到reduce方法中
        protected void reduce(Text inputkey, Iterable<IntWritable> inputvalue,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            //得到了key
            String key = inputkey.toString();
            //迭代Iterable,把每一个值相加
            int count = 0;
            //循环遍历迭代器中的所有值,做相加
            for (IntWritable intWritable : inputvalue) {
                count = count + intWritable.get();
            }
            //把值设置到IntWritable,等待输出
            countwritable.set(count);
            System.out.println("reduce输出结果:key : "+key+" , "+ count);
            context.write(inputkey, countwritable);
        }       
    }

MyReducer类参数还是一样的,我们要得到的结果是:“hadoop,2”类似与这种形式,因此,KEYOUT:Text,VALUEOUT:IntWritable,KETIN:Text,VALUEIN:IntWriatble。

这边就实现了reduce方法。好,你一看就会觉得很好奇了,为什么reduce方法的参数跟MyReducer类的不一样呢?传进来的不应该是Text类型和IntWritable类型吗,为什么变成了一个Iterable<IntWritable>类型的参数呢?那么请看回去 1 中的图解,黑的那块到底是什么呢?

没错,iterable就是一个迭代器!那么我现在重新补充一下逻辑图,请看:

这里就是把Map类中的map方法的数据输入到Reduce类中的group方法中,得到<text,it(1,1)>,再将这个数据输入到Reduce类的reduce方法中。

那么接下来reduce类,只要把iterable中的每个值进行一个累加不就得到了单词的数量了吗,然后再把这个值做成一个IntWritable类型输出出去不就行了嘛!!太聪明了。

然而实际上逻辑图还没有完整,在map类中还存在一个步骤,慢慢地会讲到。

4

运行类:run方法和main方法(调用run方法)。

代码语言:javascript
复制
//3运行类,run方法,在测试的时候使用main函数,调用这个类的run方法来运行

    /**
     * 
     * @param args 参数是要接受main方法得到的参数,在run中使用
     * @return
     * @throws Exception
     */
    public int run(String[] args) throws Exception {  
        //hadoop的配置的上下文!
        Configuration configuration = new Configuration();  
        //通过上下文,构建一个job实例,并且传入任务名称,单例!
        Job job = Job.getInstance(configuration, this.getClass().getSimpleName());

        //这参数必须添加,否则本地运行没有问题,服务器上运行会报错
        job.setJarByClass(MyMapReduce.class);

        //设置任务从哪里读取数据?
        //调用这个方法的时候,要往args中传入参数,第一个位置上要传入从哪里读数据
        Path inputpath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inputpath);
        //设置任务结果数据保存到哪里?
      //调用这个方法的时候,要往args中传入参数,第二个位置上要传入结果数据保存到哪里
        Path outputpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outputpath);

        //设置mapper类的参数
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setNumReduceTasks(2);

        //设置reduce类的参数
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // submit job -> YARN  
        boolean isSuccess = job.waitForCompletion(true);  
        return isSuccess ? 0 : 1;  
    }  


    public static void main(String[] args) {
        args= new String[]{
                "hdfs://192.168.6.129:8020/1.txt",
                "hdfs://192.168.6.129:8020/output"
        };

        MyMapReduce mr = new MyMapReduce();
        try {
            int success = -1; 
            success = mr.run(args);
            System.out.println("success:"+success);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

首先来看看run方法的参数,为什么是String [],args呢?这里的run方法的参数是要接收main方法的参数在run中使用。其他的就直接看代码即可。

然后是main方法,这里传入参数:输入路径和输出路径。最后成功运行即可。

5

最最最最后,本地跑完以后直接打jar包,传入服务器跑,这里要注意在打包之前要把main方法中的参数给注释掉,因为在服务器上面是直接传参的。

源代码的话已经上传到我的github上,在文件夹:WorldCount中,欢迎大家围观。

程序猿

改变世界

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-10-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 卡尼慕 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档