一.前述
Mapreduce可以自定义Inputforma对象和OutPutformat对象,所以原理上Mapreduce可以和任意输入源结合。
二.步骤
将结果写会到hbase中去。
2.1 Main函数
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* 分析hdfs 文本 统计单词数量
* 结果输出到 hbase表
* create 'wc','cf'
* rowkey: 单词 cf:count=单词数量
* @author root
*
*/
public class WCDemo {
/**
*
* wc
* 数据hbase表 rowkey cell存放文本
* 结果输出到 hbase表
*
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://node1:8020");//设置hdfs集群nameservices名称
conf.set("hbase.zookeeper.quorum", "node4");
Job job = Job.getInstance(conf);
job.setJarByClass(WCDemo.class);
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// job.setReducerClass();
//addDependencyJars 本地方式运行: 设置为false
// TableMapReduceUtil.initTableReducerJob("wc", WCReducer.class, job);
TableMapReduceUtil.initTableReducerJob("wc",WCReducer.class, job,
null, null, null, null, false);
Path path = new Path("/user/wc");
FileInputFormat.addInputPath(job, path);
boolean flag = job.waitForCompletion(true);
if(flag) {
System.out.println("success~~");
}
}
}
2.2 Mapper函数(和正常的Mapper没啥区别)
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String w : words) {
context.write(new Text(w), new IntWritable(1));
}
}
}
2.3 Reduce函数(主要是把Put对象写出去)
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class WCReducer extends
TableReducer<Text, IntWritable, ImmutableBytesWritable> {
@Override
protected void reduce(Text text, Iterable<IntWritable> iterable,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : iterable) {
sum += i.get();
}
Put put = new Put(text.toString().getBytes());
put.add("cf".getBytes(), "count".getBytes(), (sum+"").getBytes());
context.write(null, put);
}
}