前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >自定义分区、数据类型、排序、分组

自定义分区、数据类型、排序、分组

作者头像
汤高
发布2018-01-11 16:20:16
7720
发布2018-01-11 16:20:16
举报
文章被收录于专栏:积累沉淀积累沉淀

自定义分区、数据类型、排序、分组

代码语言:javascript
复制
/**
 * 
 * @author 自定义数据类型  键对象
 *
 */
public class KeyPair implements WritableComparable<KeyPair> {
    private int year;
    private double hot;
    public int getYear() {
        return year;
    }
    public void setYear(int year) {
        this.year = year;
    }
    public double getHot() {
        return hot;
    }
    public void setHot(double hot) {
        this.hot = hot;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        this.year=in.readInt();
        this.hot=in.readDouble();
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(year);
        out.writeDouble(hot);
    }
    @Override
    public int compareTo(KeyPair o) {
        int res=Integer.compare(year, o.getYear());
        if(res!=0){
            return res;
        }
        return Double.compare(hot, o.getHot());
    }
    @Override
    public String toString() {
        return year+"\t"+hot;
    }
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        long temp;
        temp = Double.doubleToLongBits(hot);
        result = prime * result + (int) (temp ^ (temp >>> 32));
        result = prime * result + year;
        return result;
    }
    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        KeyPair other = (KeyPair) obj;
        if (Double.doubleToLongBits(hot) != Double.doubleToLongBits(other.hot))
            return false;
        if (year != other.year)
            return false;
        return true;
    }



}
代码语言:javascript
复制
//自定义分区
public class FirstPartition extends Partitioner<KeyPair, Text> {

    @Override
    public int getPartition(KeyPair key, Text value, int num) {
        return (key.getYear()*127)%num;
    }

}
代码语言:javascript
复制
//自定义排序
public class SortHot extends WritableComparator{
    public SortHot(){
        super(KeyPair.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {

        KeyPair o1=(KeyPair) a;
        KeyPair o2=(KeyPair) b;
        int res=Integer.compare(o1.getYear(), o2.getYear());
        if(res!=0){
            return res;
        }
        //降序排序
        return -Double.compare(o1.getHot(), o2.getHot());
    }   

}
代码语言:javascript
复制
//自定义分组  按年分组
public class GroupHot extends WritableComparator{
    public GroupHot(){
        super(KeyPair.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {

        KeyPair o1=(KeyPair) a;
        KeyPair o2=(KeyPair) b;
        return Integer.compare(o1.getYear(), o2.getYear());
    }

}
代码语言:javascript
复制
 package com.hadoop.MaxHot;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class RunHot2 {
    public static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    static class HotMapper extends Mapper<LongWritable, Text, KeyPair, Text>{
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, KeyPair, Text>.Context context)
                throws IOException, InterruptedException {
            String line=value.toString();
            String[] ss=line.split("\t");
            System.out.println("------------>"+ss.length);
            if(ss.length==2){
                try {
                    //得到日期  通过解析
                    Date date=sdf.parse(ss[0]);
                    Calendar c=Calendar.getInstance();
                    c.setTime(date);
                    int year=c.get(1);
                    System.out.println(year);
                    String hot=ss[1].substring(0, ss[1].indexOf("c"));
                    KeyPair kp=new KeyPair();
                    kp.setYear(year);
                    kp.setHot(Double.parseDouble(hot));
                    context.write(kp, value);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    static class HotReduce extends Reducer<KeyPair, Text, KeyPair, Text>{
        @Override
        protected void reduce(KeyPair kp, Iterable<Text> values, Reducer<KeyPair, Text, KeyPair, Text>.Context context)
                throws IOException, InterruptedException {
            for(Text val:values){
                context.write(kp, val);
            }

        }
    }

    public static void main(String[] args) {
        Configuration conf=new Configuration();
        try {

            //得到一个Job 并设置名字
            Job job=Job.getInstance(conf,"hot");
            //设置Jar 使本程序在Hadoop中运行
            job.setJarByClass(RunHot2.class);
            //设置Map处理类
            job.setMapperClass(HotMapper.class);
            //设置map的输出类型,因为不一致,所以要设置
            job.setMapOutputKeyClass(KeyPair.class);
            job.setMapOutputValueClass(Text.class);
            job.setNumReduceTasks(3);
            job.setPartitionerClass(FirstPartition.class);
            job.setSortComparatorClass(SortHot.class);
            job.setGroupingComparatorClass(GroupHot.class);

            //设置Reduce处理类
            job.setReducerClass(HotReduce.class);
            //设置输入和输出目录
            FileInputFormat.addInputPath(job, new Path("hdfs://192.168.52.140:9000/outdata/in/"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.52.140:9000/outdata/out"+System.currentTimeMillis()));
            //启动运行
            System.exit(job.waitForCompletion(true) ? 0:1);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-06-15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档