前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >hadoop MapReduce编写一个分组统计并排序查询-分组

hadoop MapReduce编写一个分组统计并排序查询-分组

作者头像
尚浩宇
发布2018-08-17 10:09:39
9660
发布2018-08-17 10:09:39
举报
文章被收录于专栏:杂烩杂烩

说一下需求,有一张销售统计表,记录每个销售员每天的销售情况,现在要统计出某一月的每个销售员的销售情况并且按照销售额从高往低排序(hadoop默认是升序)。

首先在mysql里创建一张表

代码语言:javascript
复制
CREATE TABLE `t_product_sales` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `area` varchar(255) DEFAULT NULL COMMENT '地区',
  `salesman` varchar(255) DEFAULT NULL COMMENT '销售员姓名',
  `sales_money` decimal(10,0) DEFAULT NULL COMMENT '销售额',
  `sale_time` varchar(255) DEFAULT NULL COMMENT '销售日期(年-月-日)',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11717 DEFAULT CHARSET=utf8 COMMENT='销售统计表';

然后使用excl造数据,为了节约时间,这里制造了5月份的所有数据表数据

下面是在mysql里需求的sql语句

代码语言:javascript
复制
SELECT salesman,sum(sales_money) totalmoney
FROM `t_product_sales` 
WHERE sale_time like '2015-05%'
GROUP BY salesman
ORDER BY totalmoney DESC;

将数据从mysql中导出,并且导出成文本,文本数据,将导出的文本上传到hdfs中,放在根据经下的groupcount-in文件夹下。

我们先来分析下数据,在文本里每一行代表一个记录,在一行中以制表符区分字段。我们首先要取出所有五月份的数据,然后再对每一天的数据进行操作,取出每个人的销售额,最后排序。那么具体到程序上,map的任务就是取出五月份的的数据,reduce就是进行统计。好了,下面开始编写。

Eclipse创建项目,编写一个GroupCount类,下面是类代码:

GroupCount.java此类将数据提取出来,然后按销售员分组输出

代码语言:javascript
复制
package gruopcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class GruopCount extends Configuration implements Tool{
    private Configuration configuration;
    @Override
    public Configuration getConf() {
        return this.configuration;
    }
    enum Counter{
        TIMER
    } 
    @Override
    public void setConf(Configuration arg0) {
        this.configuration=arg0;
    }
    private static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable>{

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text,DoubleWritable >.Context context) throws IOException, InterruptedException {
            try{
                //获取查询的时间,如2015-05
                String querydate=context.getConfiguration().get("querydate");
                String[] columns=value.toString().split("\t");
                String datadate=columns[4];
                //将要查询的月份的所有数据输出到reduce中
                if(datadate.startsWith(querydate)){
                    //获取销售员
                    String salesman=columns[2];
                    //获取销售额
                    String salesmoney=columns[3];
                    //将销售员作为key输出,输出结果形如{“张三1”, [100,200,300, …]},{“张三2”, [400,500,600, …]}
                    context.write(new Text(salesman),new DoubleWritable(Double.valueOf(salesmoney)));
                }
            }catch(Exception e){
                context.getCounter(Counter.TIMER).increment(1);
                e.printStackTrace();
                return;
            }
        }
    } 
    private static class Reduce extends Reducer<Text, DoubleWritable, Text, DoubleWritable>{

        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
            double sum=0;
            //获取当前遍历的value
            for (DoubleWritable v : values) {
                sum+=v.get();
            }
            context.write(key,new DoubleWritable(sum));
        }
    }
    @Override
    public int run(String[] arg0) throws Exception {
        Job job=Job.getInstance(getConf(), "groupcount");
        job.setJarByClass(GruopCount.class);
        FileInputFormat.setInputPaths(job, new Path(arg0[1]));
        FileOutputFormat.setOutputPath(job, new Path(arg0[2]));
        
        //默认即可,若需要进行效率调优使用此代码自定义分片
        //设置要分片的calss
        //job.setCombinerClass(Reduce.class); 
        //设置分片calss
        //job.setPartitionerClass(SectionPartitioner.class);
        //设置分片个数
        //job.setNumReduceTasks(3);
        
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        job.waitForCompletion(true);
        return job.isSuccessful()?0:1;
    }
    public static void main(String[] args) throws Exception {
        String[] args2=new String[4];
        args2[0]="ss";
        args2[1]="hdfs://192.168.1.55:9000/groupcount-in/t_product_sales.txt";
        args2[2]="hdfs://192.168.1.55:9000/groupcount-out";
        args2[3]="2015-05";
        Configuration configuration=new Configuration();
        configuration.set("querydate", args2[3]);
        System.out.println(ToolRunner.run(configuration, new GruopCount(), args2));
    }
}

下面是运行日志和结果:

代码语言:javascript
复制
…………
11:47:14,882  INFO Job:1384 - Job job_local1510236679_0001 completed successfully
11:47:14,883 DEBUG UserGroupInformation:1652 - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getCounters(Job.java:764)
11:47:14,893  INFO Job:1391 - Counters: 38
    File System Counters
        FILE: Number of bytes read=443980
        FILE: Number of bytes written=1171326
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=955202
        HDFS: Number of bytes written=9094
        HDFS: Number of read operations=13
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Map-Reduce Framework
        Map input records=11716
        Map output records=11716
        Map output bytes=198360
        Map output materialized bytes=221798
        Input split bytes=123
        Combine input records=0
        Combine output records=0
        Reduce input groups=404
        Reduce shuffle bytes=221798
        Reduce input records=11716
        Reduce output records=404
        Spilled Records=23432
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=6
        CPU time spent (ms)=0
        Physical memory (bytes) snapshot=0
        Virtual memory (bytes) snapshot=0
        Total committed heap usage (bytes)=494403584
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=477601
    File Output Format Counters 
        Bytes Written=9094
11:47:14,893 DEBUG UserGroupInformation:1652 - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:322)
11:47:14,893 DEBUG UserGroupInformation:1652 - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:322)
0
11:47:14,895 DEBUG Client:97 - stopping client from cache: org.apache.hadoop.ipc.Client@18657855
11:47:14,896 DEBUG Client:103 - removing client from cache: org.apache.hadoop.ipc.Client@18657855
11:47:14,896 DEBUG Client:110 - stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@18657855
11:47:14,896 DEBUG Client:1234 - Stopping client
11:47:14,896 DEBUG Client:1184 - IPC Client (846889180) connection to /192.168.1.55:9000 from hadoop: closed
11:47:14,896 DEBUG Client:979 - IPC Client (846889180) connection to /192.168.1.55:9000 from hadoop: stopped, remaining connections 0
代码语言:javascript
复制
张三1    63875.55
张三10    90596.34
张三100    59398.289999999986
张三101    73176.12
张三11    87730.02
张三12    80457.26
张三13    62335.88999999999
张三14    79966.65
张三15    93783.38
张三16    68459.31000000001
张三17    76030.59000000001
张三18    69015.17
张三19    81065.81999999998
张三2    71407.27
张三20    70510.29
张三21    54189.579999999994
张三22    80217.74
张三23    58507.200000000004
…………

mysql:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2015/06/01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档