hadoop安装部署及Eclipse安装集成,这里不赘述了。
先说下业务需求吧,有个系统日志文件,记录系统的运行信息,其中包含DEBUG、INFO、WARN、ERROR四个级别的日志,现在想要看到所有级别各有多少条记录。
创建一个map/reduce项目,项目名为mapreducetest。在src下建立一个名为mapreducetest的包,然后建一个类名叫MapReduceTest,下面是代码。
package mapreducetest;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MapReduceTest extends Configuration implements Tool{
/**
* 配置
*/
private Configuration configuration;
/**
* 获取配置
*/
@Override
public Configuration getConf() {
return this.configuration;
}
/**
* 设置配置
*/
@Override
public void setConf(Configuration arg0) {
this.configuration=arg0;
}
/**
*
* @ClassName: Counter
* @Description: TODO(计数器)
* @author scc
* @date 2015年5月27日 下午2:54:39
*
*/
enum Counter{
TIMER
}
/**
*
* @ClassName: Map
* @Description: map实现,所有的map业务都在这里进行Mapper后的四个参数分别为,输入key类型,输入value类型,输出key类型,输出value类型
* @author scc
* @date 2015年5月27日 下午2:30:06
* @
*/
public static class Map extends Mapper<LongWritable, Text, Text, Text>{
/**
* key:输入key
* value:输入value
* context:map上下文对象
* 说明,hdfs生成的所有键值对都会调用此方法
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try{
//得到日志每一行数据
String mapvalue=value.toString();
//日志具有固定格式,通过空格切分可以获得固定打下的string数组
String[] infos=mapvalue.split(" ");
//时间在数组的第一列,日志级别在数据的第九列,
String info=infos[10];
//调整数据格式(第一个参数为key,第二个参数为value),这里key和value都设置为日志级别
context.write(new Text(info), new Text(info));
}catch(Exception e){
//遇到错误是记录错误
context.getCounter(Counter.TIMER).increment(1);
return;
}
}
}
/**
*
* @ClassName: Reduce
* @Description: reduce处理类 ,Reducer四个参数,前两个是输入key和value的类型,必须和map一样,后两个是输出的key和value的类型
* @author scc
* @date 2015年5月27日 下午3:33:06
*
*/
public static class Reduce extends Reducer<Text, Text, Text, Text>{
/**
* 第一个参数输入的value,第二个参数是该key对应的所有的value集合,第三个是reducer的上下文
* 说明:与map不同的这里是对map处理后的数据进行的调用,当map处理后的key有重复时,这里传进来的key会是去重后的key,比方说在map里放进10个键值对,
* 其中有五个key是key1,有五个是key2,那么在reduce的时候只会调用两次reduce,分别是key1和key2
*/
@Override
protected void reduce(Text key, Iterable<Text> values,Context arg2) throws IOException, InterruptedException {
//获取当前遍历的key
String info=key.toString();
//计数器
int count=0;
//当值和key相同时计数器加1
for (Text text : values) {
if(info.equals(text.toString()))
count=count+1;
}
//将级别和对应的数据写出去
arg2.write(key, new Text(String.valueOf(count)));
}
}
/**
* run方法是一个入口
*/
@Override
public int run(String[] arg0) throws Exception {
//建立一个job,并指定job的名称
Job job=Job.getInstance(getConf(), "maptest");
//指定job的类
job.setJarByClass(MapReduceTest.class);
//设置日志文件路径(hdfs路径)
FileInputFormat.setInputPaths(job, new Path(arg0[0]));
//设置结果输出路径(hdfs路径)
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
//设置map处理类的class
job.setMapperClass(Map.class);
//设置reduce的class
job.setReducerClass(Reduce.class);
//设置输出格式化的类的class
job.setOutputFormatClass(TextOutputFormat.class);
//设置输出key的类型
job.setOutputKeyClass(Text.class);
//设置输出value的类型
job.setOutputValueClass(Text.class);
//设置等待job完成
job.waitForCompletion(true);
return job.isSuccessful()?0:1;
}
public static void main(String[] args) throws Exception {
String[] args2=new String[2];
args2[0]="hdfs://192.168.1.55:9000/test2-in/singlemaptest.log";
args2[1]="hdfs://192.168.1.55:9000/test2-out";
int res=ToolRunner.run(new Configuration(), new MapReduceTest(), args2);
System.exit(res);
}
}
下面是生成的结果:
INFO 3800
WARN 55