一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序

一:序列化概念

序列化(Serialization)是指把结构化对象转化为字节流。 反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。 Java序列化(java.io.Serializable)

二:Hadoop序列化的特点

(1):序列化格式特点:   紧凑:高效使用存储空间。   快速:读写数据的额外开销小。   可扩展:可透明地读取老格式的数据。   互操作:支持多语言的交互。 (2):Hadoop的序列化格式:Writable接口

三:Hadoop序列化的作用:

(1):序列化在分布式环境的两大作用:进程间通信,永久存储。 (2):Hadoop节点间通信。

四:Writable接口(实现序列化的类实现这个接口)

(1)Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象. (2)MapReduce的任意Key和Value必须实现Writable接口. (3)MapReduce的任意key必须实现WritableComparable接口.

1:创建一个FlowBean的实体类,实现序列化操作:

 1 package com.flowSum;
 2 
 3 import java.io.DataInput;
 4 import java.io.DataOutput;
 5 import java.io.IOException;
 6 
 7 import org.apache.hadoop.io.Writable;
 8 
 9 /***
10  * 
11  * @author Administrator
12  * 1:write 是把每个对象序列化到输出流
13  * 2:readFields是把输入流字节反序列化
14  * 3:实现WritableComparable
15  *      Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
16  * 
17  */
18 public class FlowBean implements Writable{
19 
20     private String phoneNumber;//电话号码
21     private long upFlow;//上行流量
22     private long downFlow;//下行流量
23     private long sumFlow;//总流量
24     
25     
26     
27     public String getPhoneNumber() {
28         return phoneNumber;
29     }
30     public void setPhoneNumber(String phoneNumber) {
31         this.phoneNumber = phoneNumber;
32     }
33     public long getUpFlow() {
34         return upFlow;
35     }
36     public void setUpFlow(long upFlow) {
37         this.upFlow = upFlow;
38     }
39     public long getDownFlow() {
40         return downFlow;
41     }
42     public void setDownFlow(long downFlow) {
43         this.downFlow = downFlow;
44     }
45     public long getSumFlow() {
46         return sumFlow;
47     }
48     public void setSumFlow(long sumFlow) {
49         this.sumFlow = sumFlow;
50     }
51 
52     //为了对象数据的初始化方便,加入一个带参的构造函数
53     public FlowBean(String phoneNumber, long upFlow, long downFlow) {
54         this.phoneNumber = phoneNumber;
55         this.upFlow = upFlow;
56         this.downFlow = downFlow;
57         this.sumFlow = upFlow + downFlow;
58     }
59     //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
60     public FlowBean() {
61     }
62     
63     //重写toString()方法
64     @Override
65     public String toString() {
66         return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
67     }
68     
69     
70     //从数据流中反序列出对象的数据
71     //从数据流中读取字段时必须和序列化的顺序保持一致
72     @Override
73     public void readFields(DataInput in) throws IOException {
74         phoneNumber = in.readUTF();
75         upFlow = in.readLong(); 
76         downFlow = in.readLong(); 
77         sumFlow = in.readLong(); 
78          
79     }
80     
81     //将对象数据序列化到流中
82     @Override
83     public void write(DataOutput out) throws IOException {
84         out.writeUTF(phoneNumber);
85         out.writeLong(upFlow);
86         out.writeLong(downFlow);
87         out.writeLong(sumFlow);
88         
89     }
90 
91     
92 }

创建FlowSumMapper的类实现Mapper这个类:

 1 package com.flowSum;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.commons.lang.StringUtils;
 6 import org.apache.hadoop.io.LongWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 /***
10  * 
11  * @author Administrator
12  * 1:FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间进行传输,应该遵循hadoop的序列化
13  *      所以就必须实现hadoop的相应的序列化接口
14  * 2:Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
15  */
16 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
17 
18     //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量
19     //封装成key-value发送出去
20     
21     @Override
22     protected void map(LongWritable key, Text value, Context context)
23             throws IOException, InterruptedException {
24         //拿到一行数据
25         String line = value.toString();
26         //切分成各个字段
27         String[] fields = StringUtils.split(line,"/t");
28         //拿到手机号的字段
29         String phoneNumber = fields[1];
30         //拿到上行流量字段
31         long up_flow = Long.parseLong(fields[7]);
32         //拿到下行流量字段
33         long down_flow = Long.parseLong(fields[8]);
34         
35         //最后一步,封装数据为key-value进行输出
36         context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow));
37         
38     }
39     
40 }

创建FlowSumReducer类继承Reducer类:

 1 package com.flowSum;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Reducer;
 7 
 8 public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
 9 
10     //框架每传递一组数据<手机号,{flowbean,flowbean,flowbean...}>调用一次我们的reduce方法
11     //reduce中的业务逻辑就是遍历values,然后累加求和再输出
12     @Override
13     protected void reduce(Text key, Iterable<FlowBean> values, Context context)
14             throws IOException, InterruptedException {
15         //上行流量计数器和下行流量计数器
16         long up_flow_counter = 0;
17         long down_flow_counter = 0;
18         
19         //上行流量和下行流量累加求和
20         for(FlowBean bean : values){
21             up_flow_counter += bean.getUpFlow();
22             down_flow_counter += bean.getDownFlow();
23         }
24         
25         //将结果输出
26         context.write(key, new FlowBean(key.toString(), up_flow_counter, down_flow_counter));
27         
28     }
29     
30 }

创建FlowSumRunner 类继承Configured实现Tool,规范性操作(Job描述和提交类的规范写法):

 1 package com.flowSum;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.conf.Configured;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.mapreduce.Job;
 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 9 import org.apache.hadoop.util.Tool;
10 import org.apache.hadoop.util.ToolRunner;
11 
12 import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
13 /***
14  * 
15  * @author Administrator
16  * 1:Job描述和提交类的规范写法
17  */
18 public class FlowSumRunner extends Configured implements Tool{
19 
20     
21     @Override
22     public int run(String[] args) throws Exception {
23         //创建配置文件
24         Configuration conf = new Configuration();
25         //获取一个作业
26         Job job = Job.getInstance(conf);
27         
28         //设置整个job所用的那些类在哪个jar包
29         job.setJarByClass(FlowSumRunner.class);
30         
31         //本job使用的mapper和reducer的类
32         job.setMapperClass(FlowSumMapper.class);
33         job.setReducerClass(FlowSumReducer.class);
34         
35         //指定mapper的输出数据key-value类型
36         job.setMapOutputKeyClass(Text.class);
37         job.setMapOutputValueClass(FlowBean.class);
38         
39         //指定reduce的输出数据key-value类型
40         job.setOutputKeyClass(Text.class);
41         job.setOutputValueClass(FlowBean.class);
42         
43         //指定要处理的输入数据存放路径
44         //FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,
45         //FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。
46         //至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
47         FileInputFormat.setInputPaths(job, new Path(args[0]));
48         
49         //指定处理结果的输出数据存放路径
50         FileOutputFormat.setOutputPath(job, new Path(args[1]));
51         
52         //将job提交给集群运行 
53         //job.waitForCompletion(true);
54         //正常执行成功返回0,否则返回1
55         return job.waitForCompletion(true) ? 0 : 1;
56     }
57 
58     public static void main(String[] args) throws Exception {
59         //规范性调用
60         int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
61         //执行结束退出
62         System.exit(res);
63     }
64         
65 }

然后打包上传到虚拟机上面,还有模拟数据,过程省略,贴出模拟数据:

1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157995052     13826544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4            4    0    264    0    200
1363157991076     13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200
1363154400022     13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200
1363157993044     18211575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    视频网站    15    12    1527    2106    200
1363157995074     84138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200
1363157993055     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
1363157995033     15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    3156    2936    200
1363157983019     13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200
1363157984041     13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站点统计    24    9    6960    690    200
1363157973098     15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    3659    3538    200
1363157986029     15989002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站点统计    3    3    1938    180    200
1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157984040     13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    综合门户    15    12    1938    2910    200
1363157995093     13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200
1363157982040     13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    综合门户    57    102    7335    110349    200
1363157986072     18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    2412    200
1363157990043     13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    48243    200
1363157988072     13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200
1363157985066     13726238888    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157993055     13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200

可以看到打的包和模拟数据已经上传到虚拟机上:

然后将数据上传到hdfs集群(这里是伪分布式集群)上面:

现在集群上面创建一个空白的文件夹flow,然后在文件夹里面创建一个data文件夹存放数据,最后将数据存放到data文件夹里面:

然后执行程序,由于是需要传入参数的,所以注意最后两个是参数:

然后就报了一个这样子的错,我也是一脸懵逼:

 Error: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text     at java.lang.Class.asSubclass(Class.java:3165)     at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:884)     at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:981)     at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:391)     at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:80)     at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:675)     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:747)     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)     at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)     at java.security.AccessController.doPrivileged(Native Method)     at javax.security.auth.Subject.doAs(Subject.java:415)     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)     at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

 然后根据你现在学的知识肯定已经被别人学过的理论,and一定有好心的大神会贴出来错误的心态百度一下,然后解决问题:

原来是Text的包导错了(还是小心点好。不然够喝一壶的了) 不是:import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text; 而是:import org.apache.hadoop.io.Text;

然后打包上传到虚拟机上面运行,然后你会发现这个错误:

Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://master:9000/flow/output already exists     at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)     at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)     at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)     at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)     at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)     at java.security.AccessController.doPrivileged(Native Method)     at javax.security.auth.Subject.doAs(Subject.java:415)     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)     at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)     at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)     at com.flowSum.FlowSumRunner.run(FlowSumRunner.java:55)     at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)     at com.flowSum.FlowSumRunner.main(FlowSumRunner.java:60)     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     at java.lang.reflect.Method.invoke(Method.java:606)     at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

然后你把这个/flow/output的这个output文件夹删除了,因为输出文件夹是程序自动创建的:

最后运行程序(由于是需要传入参数的,所以注意最后两个是参数):

 然后就报数据越界的异常,我想可能是测试数据不干净:

Error: java.lang.ArrayIndexOutOfBoundsException: 1     at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:29)     at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:1)     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)     at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)     at java.security.AccessController.doPrivileged(Native Method)     at javax.security.auth.Subject.doAs(Subject.java:415)     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)     at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162

然后手动造了一份数据,如下所示:

(好吧,后来测试上面的测试数据又可以运行了,总之多测试几遍吧,都是坑!!!)

1363157985066      13726230501      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      2481      241      200
1363157985061      13726230502      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481      681      200
1363157985062      13726230502      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      3481     681      200
1363157985063      13726230503      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      4481     4681      200
1363157985064      13726230504      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      5481     4681      200
1363157985065      13726230505      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      6481     2681      200
1363157985066      13726230506      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      7481     2481      200
1363157985067      13726230507      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      8481     2461      200
1363157985067      13726230507      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     281      200
1363157985068      13726230508      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      2481     2681     200
1363157985068      13726230508      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      3481     24681     200
1363157985069      13726230509      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      4481     681     200
1363157985060      13726230500      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     24681     200
1363157985061      13726230501      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     681     200
1363157985066      13726230502      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     81         200
1363157985063      13726230503      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     681     200
1363157985063      13726230504      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     681     200
1363157985064      13726230505      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     2681     200
1363157985065      13726230506      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     681     200
1363157985066      13726230507      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      81      24681     200
1363157985067      13726230508      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      481     241     200
1363157985068      13726230508      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      481     681     200
1363157985068      13726230503      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      241     681     200

最后将String[] fields = StringUtils.split(line, "\t");修改为了27 String[] fields = StringUtils.split(line, " ");

(后来测试了一下,String[] fields = StringUtils.split(line, "\t");也可以,开始以为空格的大小也影响测试数据呢,代码没问题,就是测试数据的问题。)

 1 package com.flowSum;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.commons.lang.StringUtils;
 6 import org.apache.hadoop.io.LongWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 /***
10  * 
11  * @author Administrator
12  * 1:FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间进行传输,应该遵循hadoop的序列化
13  *      所以就必须实现hadoop的相应的序列化接口
14  * 2:Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
15  */
16 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
17 
18     //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量
19     //封装成key-value发送出去
20     
21     @Override
22     protected void map(LongWritable key, Text value, Context context)
23             throws IOException, InterruptedException {
24         //拿到一行数据
25         String line = value.toString();
26         //切分成各个字段
27         String[] fields = StringUtils.split(line, "      ");
28         //拿到手机号的字段
29         String phoneNumber = fields[1];
30         //拿到上行流量字段
31         long up_flow = Long.parseLong(fields[7]);
32         //拿到下行流量字段
33         long down_flow = Long.parseLong(fields[8]);
34         
35         //最后一步,封装数据为key-value进行输出
36         context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow));
37         
38     }
39     
40 }

打包上传到虚拟机上面,然后运行(正常运行结果如下所示):

 [root@master hadoop]# hadoop jar flow.jar com.flowSum.FlowSumRunner /flow/data /flow/output 17/09/20 09:35:26 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032 17/09/20 09:35:26 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 17/09/20 09:35:27 INFO input.FileInputFormat: Total input paths to process : 1 17/09/20 09:35:27 INFO mapreduce.JobSubmitter: number of splits:1 17/09/20 09:35:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505814887677_0007 17/09/20 09:35:27 INFO impl.YarnClientImpl: Submitted application application_1505814887677_0007 17/09/20 09:35:27 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505814887677_0007/ 17/09/20 09:35:27 INFO mapreduce.Job: Running job: job_1505814887677_0007 17/09/20 09:35:33 INFO mapreduce.Job: Job job_1505814887677_0007 running in uber mode : false 17/09/20 09:35:33 INFO mapreduce.Job:  map 0% reduce 0% 17/09/20 09:35:37 INFO mapreduce.Job:  map 100% reduce 0% 17/09/20 09:35:43 INFO mapreduce.Job:  map 100% reduce 100% 17/09/20 09:35:43 INFO mapreduce.Job: Job job_1505814887677_0007 completed successfully 17/09/20 09:35:43 INFO mapreduce.Job: Counters: 49     File System Counters         FILE: Number of bytes read=1179         FILE: Number of bytes written=187971         FILE: Number of read operations=0         FILE: Number of large read operations=0         FILE: Number of write operations=0         HDFS: Number of bytes read=2467         HDFS: Number of bytes written=279         HDFS: Number of read operations=6         HDFS: Number of large read operations=0         HDFS: Number of write operations=2     Job Counters         Launched map tasks=1         Launched reduce tasks=1         Data-local map tasks=1         Total time spent by all maps in occupied slots (ms)=2691         Total time spent by all reduces in occupied slots (ms)=2582         Total time spent by all map tasks (ms)=2691         Total time spent by all reduce tasks (ms)=2582         Total vcore-seconds taken by all map tasks=2691         Total vcore-seconds taken by all reduce tasks=2582         Total megabyte-seconds taken by all map tasks=2755584         Total megabyte-seconds taken by all reduce tasks=2643968     Map-Reduce Framework         Map input records=23         Map output records=23         Map output bytes=1127         Map output materialized bytes=1179         Input split bytes=93         Combine input records=0         Combine output records=0         Reduce input groups=10         Reduce shuffle bytes=1179         Reduce input records=23         Reduce output records=10         Spilled Records=46         Shuffled Maps =1         Failed Shuffles=0         Merged Map outputs=1         GC time elapsed (ms)=126         CPU time spent (ms)=1240         Physical memory (bytes) snapshot=218099712         Virtual memory (bytes) snapshot=726839296         Total committed heap usage (bytes)=137433088     Shuffle Errors         BAD_ID=0         CONNECTION=0         IO_ERROR=0         WRONG_LENGTH=0         WRONG_MAP=0         WRONG_REDUCE=0     File Input Format Counters         Bytes Read=2374     File Output Format Counters         Bytes Written=279 [root@master hadoop]#

查看输出结果如下所示:

总之吧,学习新知识,难免各种错误,静下心去解决吧。

2:流量求和统计排序案例实践:

将Mapper类和Reducer类都写成静态内部类(又遇到上面比较骚气的问题了String[] fields = StringUtils.split(line, "\t");就是跑步起来,各种报数组越界异常,郁闷,换成了String[] fields = StringUtils.split(line, " ");就跑起来了,真是一脸懵逼);

  1 package com.flowSort;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.commons.lang.StringUtils;
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.NullWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 
 17 
 18 public class FlowSortMapReduce {
 19 
 20     /***
 21      * mapper静态内部类
 22      * @author Administrator
 23      *
 24      */
 25     public static class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
 26         
 27         //拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出
 28         @Override
 29         protected void map(LongWritable key, Text value,Context context)
 30                 throws IOException, InterruptedException {
 31             //获取到一行数据
 32             String line = value.toString();
 33             //对这一行数据进行截取
 34             String[] fields = StringUtils.split(line, "");
 35             
 36             //获取数据里面的数据
 37             String phoneNumber = fields[0];
 38             long up_flow = Long.parseLong(fields[1]);
 39             long down_flow = Long.parseLong(fields[2]);
 40             
 41             //将数据进行封装传递给reduce
 42             context.write(new FlowBean(phoneNumber, up_flow, down_flow), NullWritable.get());
 43         }
 44         
 45     }
 46     
 47     /***
 48      * reducer的静态内部类
 49      * @author Administrator
 50      *
 51      */
 52     public static class FlowSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{
 53         
 54         @Override
 55         protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context)
 56                 throws IOException, InterruptedException {
 57             
 58             String phoneNumber = key.getPhoneNumber();
 59             context.write(new Text(phoneNumber), key);
 60         }
 61     }
 62     
 63     
 64     /***
 65      * 主方法
 66      * @param args
 67      * @throws InterruptedException 
 68      * @throws IOException 
 69      * @throws ClassNotFoundException 
 70      */
 71     public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
 72         //创建配置文件
 73         Configuration conf = new Configuration();
 74         //获取一个作业
 75         Job job = Job.getInstance(conf);
 76         
 77         //设置整个job所用的那些类在哪个jar包
 78         job.setJarByClass(FlowSortMapReduce.class);
 79         
 80         //本job使用的mapper和reducer的类
 81         job.setMapperClass(FlowSortMapper.class);
 82         job.setReducerClass(FlowSortReducer.class);
 83         
 84         //指定mapper的输出数据key-value类型
 85         job.setMapOutputKeyClass(FlowBean.class);
 86         job.setMapOutputValueClass(NullWritable.class);
 87         
 88         //指定reduce的输出数据key-value类型Text
 89         job.setOutputKeyClass(Text.class);
 90         job.setOutputValueClass(FlowBean.class);
 91         
 92         //指定要处理的输入数据存放路径
 93         //FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,
 94         //FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。
 95         //至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
 96         FileInputFormat.setInputPaths(job, new Path(args[0]));
 97         
 98         //指定处理结果的输出数据存放路径
 99         FileOutputFormat.setOutputPath(job, new Path(args[1]));
100         
101         //将job提交给集群运行 
102         //job.waitForCompletion(true);
103         //正常执行成功返回0,否则返回1
104         System.exit(job.waitForCompletion(true) ? 0 : 1);
105     }
106     
107 }

实体类改造,进行总流量排序处理:

  1 package com.flowSort;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 
  7 import org.apache.hadoop.io.Writable;
  8 import org.apache.hadoop.io.WritableComparable;
  9 
 10 /***
 11  * 
 12  * @author Administrator
 13  * 1:write 是把每个对象序列化到输出流
 14  * 2:readFields是把输入流字节反序列化
 15  * 3:实现WritableComparable
 16  *      Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
 17  * 
 18  */
 19 public class FlowBean implements WritableComparable<FlowBean>{
 20 
 21     
 22     private String phoneNumber;//电话号码
 23     private long upFlow;//上行流量
 24     private long downFlow;//下行流量
 25     private long sumFlow;//总流量
 26     
 27     
 28     
 29     public String getPhoneNumber() {
 30         return phoneNumber;
 31     }
 32     public void setPhoneNumber(String phoneNumber) {
 33         this.phoneNumber = phoneNumber;
 34     }
 35     public long getUpFlow() {
 36         return upFlow;
 37     }
 38     public void setUpFlow(long upFlow) {
 39         this.upFlow = upFlow;
 40     }
 41     public long getDownFlow() {
 42         return downFlow;
 43     }
 44     public void setDownFlow(long downFlow) {
 45         this.downFlow = downFlow;
 46     }
 47     public long getSumFlow() {
 48         return sumFlow;
 49     }
 50     public void setSumFlow(long sumFlow) {
 51         this.sumFlow = sumFlow;
 52     }
 53 
 54     //为了对象数据的初始化方便,加入一个带参的构造函数
 55     public FlowBean(String phoneNumber, long upFlow, long downFlow) {
 56         this.phoneNumber = phoneNumber;
 57         this.upFlow = upFlow;
 58         this.downFlow = downFlow;
 59         this.sumFlow = upFlow + downFlow;
 60     }
 61     //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
 62     public FlowBean() {
 63     }
 64     
 65     //重写toString()方法
 66     @Override
 67     public String toString() {
 68         return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
 69     }
 70     
 71     
 72     //从数据流中反序列出对象的数据
 73     //从数据流中读取字段时必须和序列化的顺序保持一致
 74     @Override
 75     public void readFields(DataInput in) throws IOException {
 76         phoneNumber = in.readUTF();
 77         upFlow = in.readLong(); 
 78         downFlow = in.readLong(); 
 79         sumFlow = in.readLong(); 
 80          
 81     }
 82     
 83     //将对象数据序列化到流中
 84     @Override
 85     public void write(DataOutput out) throws IOException {
 86         out.writeUTF(phoneNumber);
 87         out.writeLong(upFlow);
 88         out.writeLong(downFlow);
 89         out.writeLong(sumFlow);
 90         
 91     }
 92     
 93     //流量比较的实现方法
 94     @Override
 95     public int compareTo(FlowBean o) {
 96         
 97         //大就返回-1,小于等于返回1,进行倒序排序
 98         return sumFlow > o.sumFlow ? -1 : 1;
 99     }
100 
101     
102 
103 }

 效果就是这样,总之问题不断:

[root@master hadoop]# hadoop jar flowsort.jar com.flowSort.FlowSortMapReduce /flow/output4 /flow/sortoutput 17/09/21 19:32:28 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032 17/09/21 19:32:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 17/09/21 19:32:29 INFO input.FileInputFormat: Total input paths to process : 1 17/09/21 19:32:29 INFO mapreduce.JobSubmitter: number of splits:1 17/09/21 19:32:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505991512603_0004 17/09/21 19:32:29 INFO impl.YarnClientImpl: Submitted application application_1505991512603_0004 17/09/21 19:32:29 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505991512603_0004/ 17/09/21 19:32:29 INFO mapreduce.Job: Running job: job_1505991512603_0004 17/09/21 19:32:33 INFO mapreduce.Job: Job job_1505991512603_0004 running in uber mode : false 17/09/21 19:32:33 INFO mapreduce.Job:  map 0% reduce 0% 17/09/21 19:32:38 INFO mapreduce.Job:  map 100% reduce 0% 17/09/21 19:32:44 INFO mapreduce.Job:  map 100% reduce 100% 17/09/21 19:32:44 INFO mapreduce.Job: Job job_1505991512603_0004 completed successfully 17/09/21 19:32:44 INFO mapreduce.Job: Counters: 49     File System Counters         FILE: Number of bytes read=822         FILE: Number of bytes written=187379         FILE: Number of read operations=0         FILE: Number of large read operations=0         FILE: Number of write operations=0         HDFS: Number of bytes read=635         HDFS: Number of bytes written=526         HDFS: Number of read operations=6         HDFS: Number of large read operations=0         HDFS: Number of write operations=2     Job Counters         Launched map tasks=1         Launched reduce tasks=1         Data-local map tasks=1         Total time spent by all maps in occupied slots (ms)=2031         Total time spent by all reduces in occupied slots (ms)=2599         Total time spent by all map tasks (ms)=2031         Total time spent by all reduce tasks (ms)=2599         Total vcore-seconds taken by all map tasks=2031         Total vcore-seconds taken by all reduce tasks=2599         Total megabyte-seconds taken by all map tasks=2079744         Total megabyte-seconds taken by all reduce tasks=2661376     Map-Reduce Framework         Map input records=21         Map output records=21         Map output bytes=774         Map output materialized bytes=822         Input split bytes=109         Combine input records=0         Combine output records=0         Reduce input groups=21         Reduce shuffle bytes=822         Reduce input records=21         Reduce output records=21         Spilled Records=42         Shuffled Maps =1         Failed Shuffles=0         Merged Map outputs=1         GC time elapsed (ms)=121         CPU time spent (ms)=700         Physical memory (bytes) snapshot=218284032         Virtual memory (bytes) snapshot=726839296         Total committed heap usage (bytes)=137433088     Shuffle Errors         BAD_ID=0         CONNECTION=0         IO_ERROR=0         WRONG_LENGTH=0         WRONG_MAP=0         WRONG_REDUCE=0     File Input Format Counters         Bytes Read=526     File Output Format Counters         Bytes Written=526 [root@master hadoop]# hadoop fs -ls /flow/sortoutput Found 2 items -rw-r--r--   1 root supergroup          0 2017-09-21 19:32 /flow/sortoutput/_SUCCESS -rw-r--r--   1 root supergroup        526 2017-09-21 19:32 /flow/sortoutput/part-r-00000 [root@master hadoop]# hadoop fs -cat /flow/sortoutput/part-r-00000 13726238888    2481    24681    27162 13726230503    2481    24681    27162 13925057413    63    11058    11121 18320173382    18    9531    9549 13502468823    102    7335    7437 13660577991    9    6960    6969 13922314466    3008    3720    6728 13560439658    5892    400    6292 84138413    4116    1432    5548 15013685858    27    3659    3686 15920133257    20    3156    3176 13602846565    12    1938    1950 15989002119    3    1938    1941 13926435656    1512    200    1712 18211575961    12    1527    1539 13560436666    954    200    1154 13480253104    180    200    380 13760778710    120    200    320 13826544101    0    200    200 13926251106    0    200    200 13719199419    0    200    200 [root@master hadoop]#

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏BaronTalk

RxJava系列番外篇:一个RxJava解决复杂业务逻辑的案例

之前写过一系列RxJava1的文章,也承诺过会尽快有RxJava2的介绍。无奈实际项目中还未真正的使用RxJava2,不敢妄动笔墨。所以这次还是给大家分享一个使...

3358
来自专栏java、Spring、技术分享

Hystrix源码解析

  在分布式系统中,难免有对外部接口的依赖,而外部接口有可能出现响应缓慢,大量请求超时,大量访问出现异常等情况。出现上面所说的情况有可能是由很多原因导制的,可能...

1442
来自专栏码匠的流水账

聊聊spring security的账户锁定

对于登录功能来说,为了防止暴力破解密码,一般会对登录失败次数进行限定,在一定时间窗口超过一定次数,则锁定账户,来确保系统安全。本文主要讲述一下spring se...

872
来自专栏码匠的流水账

java字符全角半角转换

371
来自专栏码匠的流水账

聊聊storm TridentBoltExecutor的finishBatch方法

本文主要研究一下storm TridentBoltExecutor的finishBatch方法

714
来自专栏史上最简单的Spring Cloud教程

深入理解Feign之源码解析

什么是Feign Feign是受到Retrofit,JAXRS-2.0和WebSocket的影响,它是一个jav的到http客户端绑定的开源项目。 Feign的...

7357
来自专栏猿天地

Netty粘包拆包解决方案

前言 本篇文章是Netty专题的第六篇,前面五篇文章如下: 高性能NIO框架Netty入门篇 高性能NIO框架Netty-对象传输 高性能NIO框架Netty...

3837
来自专栏Linyb极客之路

工作流引擎之activiti利用juel进行条件分支判断

有接触过activiti的小伙伴们,可能会知道activiti可以利用网关来达到分支流转,但是对于一些业务员来说,跟他们说啥是网关,他们可能会一脸懵逼,尤其ac...

1876
来自专栏函数式编程语言及工具

SDP(9):MongoDB-Scala - data access and modeling

    MongoDB是一种文件型数据库,对数据格式没有硬性要求,所以可以实现灵活多变的数据存储和读取。MongoDB又是一种分布式数据库,与传统关系数据库不同...

3504
来自专栏皮皮之路

【JVM】浅谈双亲委派和破坏双亲委派

笔者曾经阅读过周志明的《深入理解Java虚拟机》这本书,阅读完后自以为对jvm有了一定的了解,然而当真正碰到问题的时候,才发现自己读的有多粗糙,也体会到只有实践...

772

扫码关注云+社区