一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序

一:序列化概念

序列化(Serialization)是指把结构化对象转化为字节流。 反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。 Java序列化(java.io.Serializable)

二:Hadoop序列化的特点

(1):序列化格式特点:   紧凑:高效使用存储空间。   快速:读写数据的额外开销小。   可扩展:可透明地读取老格式的数据。   互操作:支持多语言的交互。 (2):Hadoop的序列化格式:Writable接口

三:Hadoop序列化的作用:

(1):序列化在分布式环境的两大作用:进程间通信,永久存储。 (2):Hadoop节点间通信。

四:Writable接口(实现序列化的类实现这个接口)

(1)Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象. (2)MapReduce的任意Key和Value必须实现Writable接口. (3)MapReduce的任意key必须实现WritableComparable接口.

1:创建一个FlowBean的实体类,实现序列化操作:

 1 package com.flowSum;
 2 
 3 import java.io.DataInput;
 4 import java.io.DataOutput;
 5 import java.io.IOException;
 6 
 7 import org.apache.hadoop.io.Writable;
 8 
 9 /***
10  * 
11  * @author Administrator
12  * 1:write 是把每个对象序列化到输出流
13  * 2:readFields是把输入流字节反序列化
14  * 3:实现WritableComparable
15  *      Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
16  * 
17  */
18 public class FlowBean implements Writable{
19 
20     private String phoneNumber;//电话号码
21     private long upFlow;//上行流量
22     private long downFlow;//下行流量
23     private long sumFlow;//总流量
24     
25     
26     
27     public String getPhoneNumber() {
28         return phoneNumber;
29     }
30     public void setPhoneNumber(String phoneNumber) {
31         this.phoneNumber = phoneNumber;
32     }
33     public long getUpFlow() {
34         return upFlow;
35     }
36     public void setUpFlow(long upFlow) {
37         this.upFlow = upFlow;
38     }
39     public long getDownFlow() {
40         return downFlow;
41     }
42     public void setDownFlow(long downFlow) {
43         this.downFlow = downFlow;
44     }
45     public long getSumFlow() {
46         return sumFlow;
47     }
48     public void setSumFlow(long sumFlow) {
49         this.sumFlow = sumFlow;
50     }
51 
52     //为了对象数据的初始化方便,加入一个带参的构造函数
53     public FlowBean(String phoneNumber, long upFlow, long downFlow) {
54         this.phoneNumber = phoneNumber;
55         this.upFlow = upFlow;
56         this.downFlow = downFlow;
57         this.sumFlow = upFlow + downFlow;
58     }
59     //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
60     public FlowBean() {
61     }
62     
63     //重写toString()方法
64     @Override
65     public String toString() {
66         return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
67     }
68     
69     
70     //从数据流中反序列出对象的数据
71     //从数据流中读取字段时必须和序列化的顺序保持一致
72     @Override
73     public void readFields(DataInput in) throws IOException {
74         phoneNumber = in.readUTF();
75         upFlow = in.readLong(); 
76         downFlow = in.readLong(); 
77         sumFlow = in.readLong(); 
78          
79     }
80     
81     //将对象数据序列化到流中
82     @Override
83     public void write(DataOutput out) throws IOException {
84         out.writeUTF(phoneNumber);
85         out.writeLong(upFlow);
86         out.writeLong(downFlow);
87         out.writeLong(sumFlow);
88         
89     }
90 
91     
92 }

创建FlowSumMapper的类实现Mapper这个类:

 1 package com.flowSum;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.commons.lang.StringUtils;
 6 import org.apache.hadoop.io.LongWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 /***
10  * 
11  * @author Administrator
12  * 1:FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间进行传输,应该遵循hadoop的序列化
13  *      所以就必须实现hadoop的相应的序列化接口
14  * 2:Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
15  */
16 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
17 
18     //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量
19     //封装成key-value发送出去
20     
21     @Override
22     protected void map(LongWritable key, Text value, Context context)
23             throws IOException, InterruptedException {
24         //拿到一行数据
25         String line = value.toString();
26         //切分成各个字段
27         String[] fields = StringUtils.split(line,"/t");
28         //拿到手机号的字段
29         String phoneNumber = fields[1];
30         //拿到上行流量字段
31         long up_flow = Long.parseLong(fields[7]);
32         //拿到下行流量字段
33         long down_flow = Long.parseLong(fields[8]);
34         
35         //最后一步,封装数据为key-value进行输出
36         context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow));
37         
38     }
39     
40 }

创建FlowSumReducer类继承Reducer类:

 1 package com.flowSum;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Reducer;
 7 
 8 public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
 9 
10     //框架每传递一组数据<手机号,{flowbean,flowbean,flowbean...}>调用一次我们的reduce方法
11     //reduce中的业务逻辑就是遍历values,然后累加求和再输出
12     @Override
13     protected void reduce(Text key, Iterable<FlowBean> values, Context context)
14             throws IOException, InterruptedException {
15         //上行流量计数器和下行流量计数器
16         long up_flow_counter = 0;
17         long down_flow_counter = 0;
18         
19         //上行流量和下行流量累加求和
20         for(FlowBean bean : values){
21             up_flow_counter += bean.getUpFlow();
22             down_flow_counter += bean.getDownFlow();
23         }
24         
25         //将结果输出
26         context.write(key, new FlowBean(key.toString(), up_flow_counter, down_flow_counter));
27         
28     }
29     
30 }

创建FlowSumRunner 类继承Configured实现Tool,规范性操作(Job描述和提交类的规范写法):

 1 package com.flowSum;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.conf.Configured;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.mapreduce.Job;
 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 9 import org.apache.hadoop.util.Tool;
10 import org.apache.hadoop.util.ToolRunner;
11 
12 import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
13 /***
14  * 
15  * @author Administrator
16  * 1:Job描述和提交类的规范写法
17  */
18 public class FlowSumRunner extends Configured implements Tool{
19 
20     
21     @Override
22     public int run(String[] args) throws Exception {
23         //创建配置文件
24         Configuration conf = new Configuration();
25         //获取一个作业
26         Job job = Job.getInstance(conf);
27         
28         //设置整个job所用的那些类在哪个jar包
29         job.setJarByClass(FlowSumRunner.class);
30         
31         //本job使用的mapper和reducer的类
32         job.setMapperClass(FlowSumMapper.class);
33         job.setReducerClass(FlowSumReducer.class);
34         
35         //指定mapper的输出数据key-value类型
36         job.setMapOutputKeyClass(Text.class);
37         job.setMapOutputValueClass(FlowBean.class);
38         
39         //指定reduce的输出数据key-value类型
40         job.setOutputKeyClass(Text.class);
41         job.setOutputValueClass(FlowBean.class);
42         
43         //指定要处理的输入数据存放路径
44         //FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,
45         //FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。
46         //至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
47         FileInputFormat.setInputPaths(job, new Path(args[0]));
48         
49         //指定处理结果的输出数据存放路径
50         FileOutputFormat.setOutputPath(job, new Path(args[1]));
51         
52         //将job提交给集群运行 
53         //job.waitForCompletion(true);
54         //正常执行成功返回0,否则返回1
55         return job.waitForCompletion(true) ? 0 : 1;
56     }
57 
58     public static void main(String[] args) throws Exception {
59         //规范性调用
60         int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
61         //执行结束退出
62         System.exit(res);
63     }
64         
65 }

然后打包上传到虚拟机上面,还有模拟数据,过程省略,贴出模拟数据:

1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157995052     13826544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4            4    0    264    0    200
1363157991076     13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200
1363154400022     13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200
1363157993044     18211575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    视频网站    15    12    1527    2106    200
1363157995074     84138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200
1363157993055     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
1363157995033     15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    3156    2936    200
1363157983019     13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200
1363157984041     13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站点统计    24    9    6960    690    200
1363157973098     15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    3659    3538    200
1363157986029     15989002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站点统计    3    3    1938    180    200
1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157984040     13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    综合门户    15    12    1938    2910    200
1363157995093     13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200
1363157982040     13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    综合门户    57    102    7335    110349    200
1363157986072     18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    2412    200
1363157990043     13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    48243    200
1363157988072     13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200
1363157985066     13726238888    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157993055     13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200

可以看到打的包和模拟数据已经上传到虚拟机上:

然后将数据上传到hdfs集群(这里是伪分布式集群)上面:

现在集群上面创建一个空白的文件夹flow,然后在文件夹里面创建一个data文件夹存放数据,最后将数据存放到data文件夹里面:

然后执行程序,由于是需要传入参数的,所以注意最后两个是参数:

然后就报了一个这样子的错,我也是一脸懵逼:

 Error: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text     at java.lang.Class.asSubclass(Class.java:3165)     at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:884)     at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:981)     at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:391)     at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:80)     at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:675)     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:747)     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)     at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)     at java.security.AccessController.doPrivileged(Native Method)     at javax.security.auth.Subject.doAs(Subject.java:415)     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)     at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

 然后根据你现在学的知识肯定已经被别人学过的理论,and一定有好心的大神会贴出来错误的心态百度一下,然后解决问题:

原来是Text的包导错了(还是小心点好。不然够喝一壶的了) 不是:import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text; 而是:import org.apache.hadoop.io.Text;

然后打包上传到虚拟机上面运行,然后你会发现这个错误:

Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://master:9000/flow/output already exists     at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)     at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)     at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)     at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)     at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)     at java.security.AccessController.doPrivileged(Native Method)     at javax.security.auth.Subject.doAs(Subject.java:415)     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)     at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)     at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)     at com.flowSum.FlowSumRunner.run(FlowSumRunner.java:55)     at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)     at com.flowSum.FlowSumRunner.main(FlowSumRunner.java:60)     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     at java.lang.reflect.Method.invoke(Method.java:606)     at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

然后你把这个/flow/output的这个output文件夹删除了,因为输出文件夹是程序自动创建的:

最后运行程序(由于是需要传入参数的,所以注意最后两个是参数):

 然后就报数据越界的异常,我想可能是测试数据不干净:

Error: java.lang.ArrayIndexOutOfBoundsException: 1     at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:29)     at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:1)     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)     at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)     at java.security.AccessController.doPrivileged(Native Method)     at javax.security.auth.Subject.doAs(Subject.java:415)     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)     at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162

然后手动造了一份数据,如下所示:

(好吧,后来测试上面的测试数据又可以运行了,总之多测试几遍吧,都是坑!!!)

1363157985066      13726230501      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      2481      241      200
1363157985061      13726230502      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481      681      200
1363157985062      13726230502      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      3481     681      200
1363157985063      13726230503      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      4481     4681      200
1363157985064      13726230504      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      5481     4681      200
1363157985065      13726230505      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      6481     2681      200
1363157985066      13726230506      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      7481     2481      200
1363157985067      13726230507      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      8481     2461      200
1363157985067      13726230507      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     281      200
1363157985068      13726230508      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      2481     2681     200
1363157985068      13726230508      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      3481     24681     200
1363157985069      13726230509      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      4481     681     200
1363157985060      13726230500      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     24681     200
1363157985061      13726230501      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     681     200
1363157985066      13726230502      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     81         200
1363157985063      13726230503      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     681     200
1363157985063      13726230504      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     681     200
1363157985064      13726230505      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     2681     200
1363157985065      13726230506      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      1481     681     200
1363157985066      13726230507      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      81      24681     200
1363157985067      13726230508      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      481     241     200
1363157985068      13726230508      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      481     681     200
1363157985068      13726230503      00-FD-07-A4-72-B8:CMCC      120.196.100.82      i02.c.aliimg.com      24      27      241     681     200

最后将String[] fields = StringUtils.split(line, "\t");修改为了27 String[] fields = StringUtils.split(line, " ");

(后来测试了一下,String[] fields = StringUtils.split(line, "\t");也可以,开始以为空格的大小也影响测试数据呢,代码没问题,就是测试数据的问题。)

 1 package com.flowSum;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.commons.lang.StringUtils;
 6 import org.apache.hadoop.io.LongWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 /***
10  * 
11  * @author Administrator
12  * 1:FlowBean是我们自定义的一种数据类型,要在hadoop的各个节点之间进行传输,应该遵循hadoop的序列化
13  *      所以就必须实现hadoop的相应的序列化接口
14  * 2:Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
15  */
16 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
17 
18     //拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量
19     //封装成key-value发送出去
20     
21     @Override
22     protected void map(LongWritable key, Text value, Context context)
23             throws IOException, InterruptedException {
24         //拿到一行数据
25         String line = value.toString();
26         //切分成各个字段
27         String[] fields = StringUtils.split(line, "      ");
28         //拿到手机号的字段
29         String phoneNumber = fields[1];
30         //拿到上行流量字段
31         long up_flow = Long.parseLong(fields[7]);
32         //拿到下行流量字段
33         long down_flow = Long.parseLong(fields[8]);
34         
35         //最后一步,封装数据为key-value进行输出
36         context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow));
37         
38     }
39     
40 }

打包上传到虚拟机上面,然后运行(正常运行结果如下所示):

 [root@master hadoop]# hadoop jar flow.jar com.flowSum.FlowSumRunner /flow/data /flow/output 17/09/20 09:35:26 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032 17/09/20 09:35:26 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 17/09/20 09:35:27 INFO input.FileInputFormat: Total input paths to process : 1 17/09/20 09:35:27 INFO mapreduce.JobSubmitter: number of splits:1 17/09/20 09:35:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505814887677_0007 17/09/20 09:35:27 INFO impl.YarnClientImpl: Submitted application application_1505814887677_0007 17/09/20 09:35:27 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505814887677_0007/ 17/09/20 09:35:27 INFO mapreduce.Job: Running job: job_1505814887677_0007 17/09/20 09:35:33 INFO mapreduce.Job: Job job_1505814887677_0007 running in uber mode : false 17/09/20 09:35:33 INFO mapreduce.Job:  map 0% reduce 0% 17/09/20 09:35:37 INFO mapreduce.Job:  map 100% reduce 0% 17/09/20 09:35:43 INFO mapreduce.Job:  map 100% reduce 100% 17/09/20 09:35:43 INFO mapreduce.Job: Job job_1505814887677_0007 completed successfully 17/09/20 09:35:43 INFO mapreduce.Job: Counters: 49     File System Counters         FILE: Number of bytes read=1179         FILE: Number of bytes written=187971         FILE: Number of read operations=0         FILE: Number of large read operations=0         FILE: Number of write operations=0         HDFS: Number of bytes read=2467         HDFS: Number of bytes written=279         HDFS: Number of read operations=6         HDFS: Number of large read operations=0         HDFS: Number of write operations=2     Job Counters         Launched map tasks=1         Launched reduce tasks=1         Data-local map tasks=1         Total time spent by all maps in occupied slots (ms)=2691         Total time spent by all reduces in occupied slots (ms)=2582         Total time spent by all map tasks (ms)=2691         Total time spent by all reduce tasks (ms)=2582         Total vcore-seconds taken by all map tasks=2691         Total vcore-seconds taken by all reduce tasks=2582         Total megabyte-seconds taken by all map tasks=2755584         Total megabyte-seconds taken by all reduce tasks=2643968     Map-Reduce Framework         Map input records=23         Map output records=23         Map output bytes=1127         Map output materialized bytes=1179         Input split bytes=93         Combine input records=0         Combine output records=0         Reduce input groups=10         Reduce shuffle bytes=1179         Reduce input records=23         Reduce output records=10         Spilled Records=46         Shuffled Maps =1         Failed Shuffles=0         Merged Map outputs=1         GC time elapsed (ms)=126         CPU time spent (ms)=1240         Physical memory (bytes) snapshot=218099712         Virtual memory (bytes) snapshot=726839296         Total committed heap usage (bytes)=137433088     Shuffle Errors         BAD_ID=0         CONNECTION=0         IO_ERROR=0         WRONG_LENGTH=0         WRONG_MAP=0         WRONG_REDUCE=0     File Input Format Counters         Bytes Read=2374     File Output Format Counters         Bytes Written=279 [root@master hadoop]#

查看输出结果如下所示:

总之吧,学习新知识,难免各种错误,静下心去解决吧。

2:流量求和统计排序案例实践:

将Mapper类和Reducer类都写成静态内部类(又遇到上面比较骚气的问题了String[] fields = StringUtils.split(line, "\t");就是跑步起来,各种报数组越界异常,郁闷,换成了String[] fields = StringUtils.split(line, " ");就跑起来了,真是一脸懵逼);

  1 package com.flowSort;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.commons.lang.StringUtils;
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.NullWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 
 17 
 18 public class FlowSortMapReduce {
 19 
 20     /***
 21      * mapper静态内部类
 22      * @author Administrator
 23      *
 24      */
 25     public static class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
 26         
 27         //拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出
 28         @Override
 29         protected void map(LongWritable key, Text value,Context context)
 30                 throws IOException, InterruptedException {
 31             //获取到一行数据
 32             String line = value.toString();
 33             //对这一行数据进行截取
 34             String[] fields = StringUtils.split(line, "");
 35             
 36             //获取数据里面的数据
 37             String phoneNumber = fields[0];
 38             long up_flow = Long.parseLong(fields[1]);
 39             long down_flow = Long.parseLong(fields[2]);
 40             
 41             //将数据进行封装传递给reduce
 42             context.write(new FlowBean(phoneNumber, up_flow, down_flow), NullWritable.get());
 43         }
 44         
 45     }
 46     
 47     /***
 48      * reducer的静态内部类
 49      * @author Administrator
 50      *
 51      */
 52     public static class FlowSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{
 53         
 54         @Override
 55         protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context)
 56                 throws IOException, InterruptedException {
 57             
 58             String phoneNumber = key.getPhoneNumber();
 59             context.write(new Text(phoneNumber), key);
 60         }
 61     }
 62     
 63     
 64     /***
 65      * 主方法
 66      * @param args
 67      * @throws InterruptedException 
 68      * @throws IOException 
 69      * @throws ClassNotFoundException 
 70      */
 71     public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
 72         //创建配置文件
 73         Configuration conf = new Configuration();
 74         //获取一个作业
 75         Job job = Job.getInstance(conf);
 76         
 77         //设置整个job所用的那些类在哪个jar包
 78         job.setJarByClass(FlowSortMapReduce.class);
 79         
 80         //本job使用的mapper和reducer的类
 81         job.setMapperClass(FlowSortMapper.class);
 82         job.setReducerClass(FlowSortReducer.class);
 83         
 84         //指定mapper的输出数据key-value类型
 85         job.setMapOutputKeyClass(FlowBean.class);
 86         job.setMapOutputValueClass(NullWritable.class);
 87         
 88         //指定reduce的输出数据key-value类型Text
 89         job.setOutputKeyClass(Text.class);
 90         job.setOutputValueClass(FlowBean.class);
 91         
 92         //指定要处理的输入数据存放路径
 93         //FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,
 94         //FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。
 95         //至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
 96         FileInputFormat.setInputPaths(job, new Path(args[0]));
 97         
 98         //指定处理结果的输出数据存放路径
 99         FileOutputFormat.setOutputPath(job, new Path(args[1]));
100         
101         //将job提交给集群运行 
102         //job.waitForCompletion(true);
103         //正常执行成功返回0,否则返回1
104         System.exit(job.waitForCompletion(true) ? 0 : 1);
105     }
106     
107 }

实体类改造,进行总流量排序处理:

  1 package com.flowSort;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 
  7 import org.apache.hadoop.io.Writable;
  8 import org.apache.hadoop.io.WritableComparable;
  9 
 10 /***
 11  * 
 12  * @author Administrator
 13  * 1:write 是把每个对象序列化到输出流
 14  * 2:readFields是把输入流字节反序列化
 15  * 3:实现WritableComparable
 16  *      Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
 17  * 
 18  */
 19 public class FlowBean implements WritableComparable<FlowBean>{
 20 
 21     
 22     private String phoneNumber;//电话号码
 23     private long upFlow;//上行流量
 24     private long downFlow;//下行流量
 25     private long sumFlow;//总流量
 26     
 27     
 28     
 29     public String getPhoneNumber() {
 30         return phoneNumber;
 31     }
 32     public void setPhoneNumber(String phoneNumber) {
 33         this.phoneNumber = phoneNumber;
 34     }
 35     public long getUpFlow() {
 36         return upFlow;
 37     }
 38     public void setUpFlow(long upFlow) {
 39         this.upFlow = upFlow;
 40     }
 41     public long getDownFlow() {
 42         return downFlow;
 43     }
 44     public void setDownFlow(long downFlow) {
 45         this.downFlow = downFlow;
 46     }
 47     public long getSumFlow() {
 48         return sumFlow;
 49     }
 50     public void setSumFlow(long sumFlow) {
 51         this.sumFlow = sumFlow;
 52     }
 53 
 54     //为了对象数据的初始化方便,加入一个带参的构造函数
 55     public FlowBean(String phoneNumber, long upFlow, long downFlow) {
 56         this.phoneNumber = phoneNumber;
 57         this.upFlow = upFlow;
 58         this.downFlow = downFlow;
 59         this.sumFlow = upFlow + downFlow;
 60     }
 61     //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
 62     public FlowBean() {
 63     }
 64     
 65     //重写toString()方法
 66     @Override
 67     public String toString() {
 68         return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
 69     }
 70     
 71     
 72     //从数据流中反序列出对象的数据
 73     //从数据流中读取字段时必须和序列化的顺序保持一致
 74     @Override
 75     public void readFields(DataInput in) throws IOException {
 76         phoneNumber = in.readUTF();
 77         upFlow = in.readLong(); 
 78         downFlow = in.readLong(); 
 79         sumFlow = in.readLong(); 
 80          
 81     }
 82     
 83     //将对象数据序列化到流中
 84     @Override
 85     public void write(DataOutput out) throws IOException {
 86         out.writeUTF(phoneNumber);
 87         out.writeLong(upFlow);
 88         out.writeLong(downFlow);
 89         out.writeLong(sumFlow);
 90         
 91     }
 92     
 93     //流量比较的实现方法
 94     @Override
 95     public int compareTo(FlowBean o) {
 96         
 97         //大就返回-1,小于等于返回1,进行倒序排序
 98         return sumFlow > o.sumFlow ? -1 : 1;
 99     }
100 
101     
102 
103 }

 效果就是这样,总之问题不断:

[root@master hadoop]# hadoop jar flowsort.jar com.flowSort.FlowSortMapReduce /flow/output4 /flow/sortoutput 17/09/21 19:32:28 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032 17/09/21 19:32:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 17/09/21 19:32:29 INFO input.FileInputFormat: Total input paths to process : 1 17/09/21 19:32:29 INFO mapreduce.JobSubmitter: number of splits:1 17/09/21 19:32:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505991512603_0004 17/09/21 19:32:29 INFO impl.YarnClientImpl: Submitted application application_1505991512603_0004 17/09/21 19:32:29 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505991512603_0004/ 17/09/21 19:32:29 INFO mapreduce.Job: Running job: job_1505991512603_0004 17/09/21 19:32:33 INFO mapreduce.Job: Job job_1505991512603_0004 running in uber mode : false 17/09/21 19:32:33 INFO mapreduce.Job:  map 0% reduce 0% 17/09/21 19:32:38 INFO mapreduce.Job:  map 100% reduce 0% 17/09/21 19:32:44 INFO mapreduce.Job:  map 100% reduce 100% 17/09/21 19:32:44 INFO mapreduce.Job: Job job_1505991512603_0004 completed successfully 17/09/21 19:32:44 INFO mapreduce.Job: Counters: 49     File System Counters         FILE: Number of bytes read=822         FILE: Number of bytes written=187379         FILE: Number of read operations=0         FILE: Number of large read operations=0         FILE: Number of write operations=0         HDFS: Number of bytes read=635         HDFS: Number of bytes written=526         HDFS: Number of read operations=6         HDFS: Number of large read operations=0         HDFS: Number of write operations=2     Job Counters         Launched map tasks=1         Launched reduce tasks=1         Data-local map tasks=1         Total time spent by all maps in occupied slots (ms)=2031         Total time spent by all reduces in occupied slots (ms)=2599         Total time spent by all map tasks (ms)=2031         Total time spent by all reduce tasks (ms)=2599         Total vcore-seconds taken by all map tasks=2031         Total vcore-seconds taken by all reduce tasks=2599         Total megabyte-seconds taken by all map tasks=2079744         Total megabyte-seconds taken by all reduce tasks=2661376     Map-Reduce Framework         Map input records=21         Map output records=21         Map output bytes=774         Map output materialized bytes=822         Input split bytes=109         Combine input records=0         Combine output records=0         Reduce input groups=21         Reduce shuffle bytes=822         Reduce input records=21         Reduce output records=21         Spilled Records=42         Shuffled Maps =1         Failed Shuffles=0         Merged Map outputs=1         GC time elapsed (ms)=121         CPU time spent (ms)=700         Physical memory (bytes) snapshot=218284032         Virtual memory (bytes) snapshot=726839296         Total committed heap usage (bytes)=137433088     Shuffle Errors         BAD_ID=0         CONNECTION=0         IO_ERROR=0         WRONG_LENGTH=0         WRONG_MAP=0         WRONG_REDUCE=0     File Input Format Counters         Bytes Read=526     File Output Format Counters         Bytes Written=526 [root@master hadoop]# hadoop fs -ls /flow/sortoutput Found 2 items -rw-r--r--   1 root supergroup          0 2017-09-21 19:32 /flow/sortoutput/_SUCCESS -rw-r--r--   1 root supergroup        526 2017-09-21 19:32 /flow/sortoutput/part-r-00000 [root@master hadoop]# hadoop fs -cat /flow/sortoutput/part-r-00000 13726238888    2481    24681    27162 13726230503    2481    24681    27162 13925057413    63    11058    11121 18320173382    18    9531    9549 13502468823    102    7335    7437 13660577991    9    6960    6969 13922314466    3008    3720    6728 13560439658    5892    400    6292 84138413    4116    1432    5548 15013685858    27    3659    3686 15920133257    20    3156    3176 13602846565    12    1938    1950 15989002119    3    1938    1941 13926435656    1512    200    1712 18211575961    12    1527    1539 13560436666    954    200    1154 13480253104    180    200    380 13760778710    120    200    320 13826544101    0    200    200 13926251106    0    200    200 13719199419    0    200    200 [root@master hadoop]#

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏飞扬的花生

jsencrypt参数前端加密c#解密

      写程序时一般是通过form表单或者ajax方式将参数提交到服务器进行验证,如何防止提交的请求不被抓包后串改,虽然无法说绝对安全却给非法提交提高了难度...

4219
来自专栏一个会写诗的程序员的博客

Spring Reactor 项目核心库Reactor Core

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactiv...

2752
来自专栏张善友的专栏

Mix 10 上的asp.net mvc 2的相关Session

Beyond File | New Company: From Cheesy Sample to Social Platform Scott Hansel...

2767
来自专栏一个爱瞎折腾的程序猿

sqlserver使用存储过程跟踪SQL

USE [master] GO /****** Object: StoredProcedure [dbo].[sp_perfworkload_trace_s...

2830
来自专栏芋道源码1024

熔断器 Hystrix 源码解析 —— 断路器 HystrixCircuitBreaker

本文主要基于 Hystrix 1.5.X 版本 1. 概述 2. HystrixCircuitBreaker 3. HystrixCircuitBreaker....

5747
来自专栏张善友的专栏

Miguel de Icaza 细说 Mix 07大会上的Silverlight和DLR

Mono之父Miguel de Icaza 详细报道微软Mix 07大会上的Silverlight和DLR ,上面还谈到了Mono and Silverligh...

2997
来自专栏我和未来有约会

Silverlight第三方控件专题

这里我收集整理了目前网上silverlight第三方控件的专题,若果有所遗漏请告知我一下。 名称 简介 截图 telerik 商 RadC...

4385
来自专栏菩提树下的杨过

Flash/Flex学习笔记(23):运动学原理

先写一个公用的小球类Ball: package{ import flash.display.Sprite; //小球 类 public class B...

27210
来自专栏我和未来有约会

Kit 3D 更新

Kit3D is a 3D graphics engine written for Microsoft Silverlight. Kit3D was inita...

2916
来自专栏魂祭心

原 canvas绘制clock

5074

扫码关注云+社区