自定义分区、数据类型、排序、分组
/**
*
* @author 自定义数据类型 键对象
*
*/
public class KeyPair implements WritableComparable<KeyPair> {
private int year;
private double hot;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public double getHot() {
return hot;
}
public void setHot(double hot) {
this.hot = hot;
}
@Override
public void readFields(DataInput in) throws IOException {
this.year=in.readInt();
this.hot=in.readDouble();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeDouble(hot);
}
@Override
public int compareTo(KeyPair o) {
int res=Integer.compare(year, o.getYear());
if(res!=0){
return res;
}
return Double.compare(hot, o.getHot());
}
@Override
public String toString() {
return year+"\t"+hot;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
long temp;
temp = Double.doubleToLongBits(hot);
result = prime * result + (int) (temp ^ (temp >>> 32));
result = prime * result + year;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
KeyPair other = (KeyPair) obj;
if (Double.doubleToLongBits(hot) != Double.doubleToLongBits(other.hot))
return false;
if (year != other.year)
return false;
return true;
}
}
//自定义分区
public class FirstPartition extends Partitioner<KeyPair, Text> {
@Override
public int getPartition(KeyPair key, Text value, int num) {
return (key.getYear()*127)%num;
}
}
//自定义排序
public class SortHot extends WritableComparator{
public SortHot(){
super(KeyPair.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyPair o1=(KeyPair) a;
KeyPair o2=(KeyPair) b;
int res=Integer.compare(o1.getYear(), o2.getYear());
if(res!=0){
return res;
}
//降序排序
return -Double.compare(o1.getHot(), o2.getHot());
}
}
//自定义分组 按年分组
public class GroupHot extends WritableComparator{
public GroupHot(){
super(KeyPair.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyPair o1=(KeyPair) a;
KeyPair o2=(KeyPair) b;
return Integer.compare(o1.getYear(), o2.getYear());
}
}
package com.hadoop.MaxHot;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RunHot2 {
public static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
static class HotMapper extends Mapper<LongWritable, Text, KeyPair, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, KeyPair, Text>.Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[] ss=line.split("\t");
System.out.println("------------>"+ss.length);
if(ss.length==2){
try {
//得到日期 通过解析
Date date=sdf.parse(ss[0]);
Calendar c=Calendar.getInstance();
c.setTime(date);
int year=c.get(1);
System.out.println(year);
String hot=ss[1].substring(0, ss[1].indexOf("c"));
KeyPair kp=new KeyPair();
kp.setYear(year);
kp.setHot(Double.parseDouble(hot));
context.write(kp, value);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
static class HotReduce extends Reducer<KeyPair, Text, KeyPair, Text>{
@Override
protected void reduce(KeyPair kp, Iterable<Text> values, Reducer<KeyPair, Text, KeyPair, Text>.Context context)
throws IOException, InterruptedException {
for(Text val:values){
context.write(kp, val);
}
}
}
public static void main(String[] args) {
Configuration conf=new Configuration();
try {
//得到一个Job 并设置名字
Job job=Job.getInstance(conf,"hot");
//设置Jar 使本程序在Hadoop中运行
job.setJarByClass(RunHot2.class);
//设置Map处理类
job.setMapperClass(HotMapper.class);
//设置map的输出类型,因为不一致,所以要设置
job.setMapOutputKeyClass(KeyPair.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(3);
job.setPartitionerClass(FirstPartition.class);
job.setSortComparatorClass(SortHot.class);
job.setGroupingComparatorClass(GroupHot.class);
//设置Reduce处理类
job.setReducerClass(HotReduce.class);
//设置输入和输出目录
FileInputFormat.addInputPath(job, new Path("hdfs://192.168.52.140:9000/outdata/in/"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.52.140:9000/outdata/out"+System.currentTimeMillis()));
//启动运行
System.exit(job.waitForCompletion(true) ? 0:1);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}