前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >hadoop MapReduce编写一个分组统计并排序查询-排序

hadoop MapReduce编写一个分组统计并排序查询-排序

作者头像
尚浩宇
发布2018-08-17 10:10:02
7460
发布2018-08-17 10:10:02
举报
文章被收录于专栏:杂烩杂烩

下面是排序代码:

代码语言:javascript
复制
package gruopcount;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class OrderGroupCount extends Configuration implements Tool{
    private Configuration configuration;
    @Override
    public Configuration getConf() {
        return this.configuration;
    }
    enum Counter{
        TIMER
    } 
    @Override
    public void setConf(Configuration arg0) {
        this.configuration=arg0;
    }
    private static class Map extends Mapper<LongWritable, Text, DoubleWritable, Text>{

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DoubleWritable,Text >.Context context) throws IOException, InterruptedException {
            try{
                String[] columns=value.toString().split("\t");
                    //获取销售员
                    String salesman=columns[0];
                    //获取销售额
                    String salesmoney=columns[1];
                    //将销售额作为key输出并排序
                    context.write(new DoubleWritable(Double.valueOf(salesmoney)),new Text(salesman));
            }catch(Exception e){
                context.getCounter(Counter.TIMER).increment(1);
                e.printStackTrace();
                return;
            }
        }
    } 
    private static class Reduce extends Reducer<DoubleWritable, Text, Text, DoubleWritable>{

        @Override
        protected void reduce(DoubleWritable key, Iterable<Text> values, Reducer<DoubleWritable, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
            //获取当前遍历的value并输出
            for (Text v : values) {
                context.write(v,key);
            }
        }
    }
    private static class Sort extends WritableComparator{
        public Sort() {
            super(DoubleWritable.class,true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            return -super.compare(a, b);
        }

        
    }
    private static class SectionPartitioner extends Partitioner<Text, Text>{

        @Override
        public int getPartition(Text key, Text value, int numReduceTasks) {
            String str = key.toString();  
            if (str.startsWith("zh")) {  
                return 0;  
            } else if (str.startsWith("l")) {  
                return 1;  
            } else {  
                return 2;  
            }  
        }
    } 
    @Override
    public int run(String[] arg0) throws Exception {
        Job job=Job.getInstance(getConf(), "groupcount");
        job.setJarByClass(OrderGroupCount.class);
        FileInputFormat.setInputPaths(job, new Path(arg0[1]));
        FileOutputFormat.setOutputPath(job, new Path(arg0[2]));
        job.setMapperClass(Map.class);
        job.setSortComparatorClass(Sort.class);
        job.setReducerClass(Reduce.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        //默认即可,若需要进行效率调优使用此代码自定义分片
        //设置要分片的calss
        //job.setCombinerClass(Reduce.class); 
        //设置分片calss
        //job.setPartitionerClass(SectionPartitioner.class);
        //设置分片个数
        //job.setNumReduceTasks(3);
        job.setOutputKeyClass(DoubleWritable.class);
        job.setOutputValueClass(Text.class);
        job.waitForCompletion(true);
        return job.isSuccessful()?0:1;
    }
    public static void main(String[] args) throws Exception {
        String[] args2=new String[4];
        args2[0]="ss";
        args2[1]="hdfs://192.168.1.55:9000/groupcount-in/t_product_sales.txt";
        args2[2]="hdfs://192.168.1.55:9000/groupcount-out";
        args2[3]="2015-05";
        Configuration configuration=new Configuration();
        configuration.set("querydate", args2[3]);
        //如果第一个程序运行成功执行第二个程序
        if(0==ToolRunner.run(configuration, new GruopCount(), args2)){
            args2[1]="hdfs://192.168.1.55:9000/groupcount-out/part-r-00000";
            args2[2]="hdfs://192.168.1.55:9000/ordergroupcount-out";
            ToolRunner.run(configuration, new OrderGroupCount(), args2);
            //将中间输出目录删除
            FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.1.55:9000"),configuration);
            Path path=new Path("/groupcount-out");
            if(fs.exists(path))
                fs.delete(path,true);
        }
    }
}

然后是运行日志和结果:

代码语言:javascript
复制
11:54:54,443  INFO Job:1384 - Job job_local470508679_0002 completed successfully
11:54:54,443 DEBUG UserGroupInformation:1652 - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getCounters(Job.java:764)
11:54:54,454  INFO Job:1391 - Counters: 38
    File System Counters
        FILE: Number of bytes read=903286
        FILE: Number of bytes written=1916480
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=973390
        HDFS: Number of bytes written=27282
        HDFS: Number of read operations=39
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=16
    Map-Reduce Framework
        Map input records=404
        Map output records=404
        Map output bytes=6840
        Map output materialized bytes=7654
        Input split bytes=117
        Combine input records=0
        Combine output records=0
        Reduce input groups=404
        Reduce shuffle bytes=7654
        Reduce input records=404
        Reduce output records=404
        Spilled Records=808
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=0
        CPU time spent (ms)=0
        Physical memory (bytes) snapshot=0
        Virtual memory (bytes) snapshot=0
        Total committed heap usage (bytes)=671088640
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=9094
    File Output Format Counters 
        Bytes Written=9094
11:54:54,454 DEBUG UserGroupInformation:1652 - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:322)
11:54:54,455 DEBUG UserGroupInformation:1652 - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:322)
11:54:54,455 DEBUG Client:1024 - IPC Client (129566217) connection to /192.168.1.55:9000 from hadoop sending #45
11:54:54,456 DEBUG Client:1081 - IPC Client (129566217) connection to /192.168.1.55:9000 from hadoop got value #45
11:54:54,457 DEBUG ProtobufRpcEngine:253 - Call: getFileInfo took 2ms
11:54:54,457 DEBUG Client:1024 - IPC Client (129566217) connection to /192.168.1.55:9000 from hadoop sending #46
11:54:54,519 DEBUG Client:1081 - IPC Client (129566217) connection to /192.168.1.55:9000 from hadoop got value #46
11:54:54,519 DEBUG ProtobufRpcEngine:253 - Call: delete took 62ms
11:54:54,521 DEBUG Client:97 - stopping client from cache: org.apache.hadoop.ipc.Client@7ff5b38d
11:54:54,521 DEBUG Client:103 - removing client from cache: org.apache.hadoop.ipc.Client@7ff5b38d
11:54:54,521 DEBUG Client:110 - stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@7ff5b38d
11:54:54,521 DEBUG Client:1234 - Stopping client
11:54:54,529 DEBUG Client:1184 - IPC Client (129566217) connection to /192.168.1.55:9000 from hadoop: closed
11:54:54,529 DEBUG Client:979 - IPC Client (129566217) connection to /192.168.1.55:9000 from hadoop: stopped, remaining connections 0
代码语言:javascript
复制
张三67    95750.46
张三15    93783.38
李四90    93780.04
麻子39    90876.53
张三10    90596.34
王二87    90354.13
张三78    89810.42000000001
王二8    89567.97000000002
王二97    88581.10999999999
王二62    88430.04000000001
王二21    88239.09000000001
张三11    87730.02
麻子89    87449.76
王二61    87189.91999999998
麻子72    87002.76000000001
麻子84    86922.65
李四97    86917.75
李四3    86378.59
王二57    86276.37000000001
………………

mysql:

如果要打包,需要修改main方法:

代码语言:javascript
复制
public static void main(String[] args) throws Exception {
//        String[] args2=new String[4];
//        args2[0]="ss";
//        args2[1]="hdfs://192.168.1.55:9000/groupcount-in/t_product_sales.txt";
//        args2[2]="hdfs://192.168.1.55:9000/groupcount-out";
//        args2[3]="2015-05";
        Configuration configuration=new Configuration();
        configuration.set("querydate", args[3]);
        //如果第一个程序运行成功执行第二个程序
        if(0==ToolRunner.run(configuration, new GruopCount(), args)){
            args[1]="hdfs://192.168.1.55:9000/groupcount-out/part-r-00000";
            args[2]="hdfs://192.168.1.55:9000/ordergroupcount-out";
            ToolRunner.run(configuration, new OrderGroupCount(), args);
            //将中间输出目录删除
            FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.1.55:9000"),configuration);
            Path path=new Path("/groupcount-out");
            if(fs.exists(path))
                fs.delete(path,true);
        }
    }

执行命令时注意参数位置,正确的执行命令应该是如下(假如打包的jar放在hadoop根目录下的mylib,jar名称为groutcount):

代码语言:javascript
复制
bin/hadoop jar mylib/groupcount.jar gruopcount.OrderGroupCount /groupcount-in/t_product_sales.txt /ordergroupcount-out 2015-05
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2015/06/01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档