前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Eclipse下Hadoop的MapReduce开发之MapReduce编写

Eclipse下Hadoop的MapReduce开发之MapReduce编写

作者头像
尚浩宇
发布2018-08-17 10:10:34
5070
发布2018-08-17 10:10:34
举报
文章被收录于专栏:杂烩

hadoop安装部署及Eclipse安装集成,这里不赘述了。

    先说下业务需求吧,有个系统日志文件,记录系统的运行信息,其中包含DEBUG、INFO、WARN、ERROR四个级别的日志,现在想要看到所有级别各有多少条记录。

    创建一个map/reduce项目,项目名为mapreducetest。在src下建立一个名为mapreducetest的包,然后建一个类名叫MapReduceTest,下面是代码。

代码语言:javascript
复制
package mapreducetest;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapReduceTest extends Configuration implements Tool{
    /**
     * 配置
     */
    private Configuration configuration;
    /**
     * 获取配置
     */
    @Override
    public Configuration getConf() {
        return this.configuration;
    }
    /**
     * 设置配置
     */
    @Override
    public void setConf(Configuration arg0) {
        this.configuration=arg0;
    }
    /**
     * 
     * @ClassName: Counter 
     * @Description: TODO(计数器) 
     * @author scc
     * @date 2015年5月27日 下午2:54:39 
     *
     */
    enum Counter{
        TIMER
    }
    /**
     * 
     * @ClassName: Map 
     * @Description: map实现,所有的map业务都在这里进行Mapper后的四个参数分别为,输入key类型,输入value类型,输出key类型,输出value类型
     * @author scc
     * @date 2015年5月27日 下午2:30:06 
     * @
     */
    public static class Map extends Mapper<LongWritable, Text, Text, Text>{
        /**
         * key:输入key
         * value:输入value
         * context:map上下文对象
         * 说明,hdfs生成的所有键值对都会调用此方法
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            try{
                //得到日志每一行数据
                String mapvalue=value.toString();
                //日志具有固定格式,通过空格切分可以获得固定打下的string数组
                String[] infos=mapvalue.split(" ");
                //时间在数组的第一列,日志级别在数据的第九列,
                String info=infos[10];
                //调整数据格式(第一个参数为key,第二个参数为value),这里key和value都设置为日志级别
                context.write(new Text(info), new Text(info));
            }catch(Exception e){
                //遇到错误是记录错误
                context.getCounter(Counter.TIMER).increment(1);
                return;
            }
        }
        
    }
    /**
     * 
     * @ClassName: Reduce 
     * @Description: reduce处理类 ,Reducer四个参数,前两个是输入key和value的类型,必须和map一样,后两个是输出的key和value的类型
     * @author scc
     * @date 2015年5月27日 下午3:33:06 
     *
     */
    public static class Reduce extends Reducer<Text, Text, Text, Text>{
        /**
         * 第一个参数输入的value,第二个参数是该key对应的所有的value集合,第三个是reducer的上下文
         * 说明:与map不同的这里是对map处理后的数据进行的调用,当map处理后的key有重复时,这里传进来的key会是去重后的key,比方说在map里放进10个键值对,
         * 其中有五个key是key1,有五个是key2,那么在reduce的时候只会调用两次reduce,分别是key1和key2
         */
        @Override
        protected void reduce(Text key, Iterable<Text> values,Context arg2) throws IOException, InterruptedException {
            //获取当前遍历的key
            String info=key.toString();
            //计数器
            int count=0;
            //当值和key相同时计数器加1
            for (Text text : values) {
                if(info.equals(text.toString()))
                    count=count+1;
            }
            //将级别和对应的数据写出去
            arg2.write(key, new Text(String.valueOf(count)));
        }
    }
    /**
     * run方法是一个入口
     */
    @Override
    public int run(String[] arg0) throws Exception {
        //建立一个job,并指定job的名称
        Job job=Job.getInstance(getConf(), "maptest");
        //指定job的类
        job.setJarByClass(MapReduceTest.class);
        //设置日志文件路径(hdfs路径)
        FileInputFormat.setInputPaths(job,  new Path(arg0[0]));
        //设置结果输出路径(hdfs路径)
        FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
        //设置map处理类的class
        job.setMapperClass(Map.class);
        //设置reduce的class
        job.setReducerClass(Reduce.class);
        //设置输出格式化的类的class
        job.setOutputFormatClass(TextOutputFormat.class);
        //设置输出key的类型
        job.setOutputKeyClass(Text.class);
        //设置输出value的类型
        job.setOutputValueClass(Text.class);        
        //设置等待job完成
        job.waitForCompletion(true);
        return job.isSuccessful()?0:1;
    }
    public static void main(String[] args) throws Exception {
            String[] args2=new String[2];
            args2[0]="hdfs://192.168.1.55:9000/test2-in/singlemaptest.log";
            args2[1]="hdfs://192.168.1.55:9000/test2-out";
            int res=ToolRunner.run(new Configuration(), new MapReduceTest(), args2);
            System.exit(res);
    }
}

下面是生成的结果:

代码语言:javascript
复制
INFO	3800
WARN	55
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2015/05/27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档