下面是排序代码:
package gruopcount;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class OrderGroupCount extends Configuration implements Tool{
private Configuration configuration;
@Override
public Configuration getConf() {
return this.configuration;
}
enum Counter{
TIMER
}
@Override
public void setConf(Configuration arg0) {
this.configuration=arg0;
}
private static class Map extends Mapper<LongWritable, Text, DoubleWritable, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DoubleWritable,Text >.Context context) throws IOException, InterruptedException {
try{
String[] columns=value.toString().split("\t");
//获取销售员
String salesman=columns[0];
//获取销售额
String salesmoney=columns[1];
//将销售额作为key输出并排序
context.write(new DoubleWritable(Double.valueOf(salesmoney)),new Text(salesman));
}catch(Exception e){
context.getCounter(Counter.TIMER).increment(1);
e.printStackTrace();
return;
}
}
}
private static class Reduce extends Reducer<DoubleWritable, Text, Text, DoubleWritable>{
@Override
protected void reduce(DoubleWritable key, Iterable<Text> values, Reducer<DoubleWritable, Text, Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
//获取当前遍历的value并输出
for (Text v : values) {
context.write(v,key);
}
}
}
private static class Sort extends WritableComparator{
public Sort() {
super(DoubleWritable.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
}
private static class SectionPartitioner extends Partitioner<Text, Text>{
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
String str = key.toString();
if (str.startsWith("zh")) {
return 0;
} else if (str.startsWith("l")) {
return 1;
} else {
return 2;
}
}
}
@Override
public int run(String[] arg0) throws Exception {
Job job=Job.getInstance(getConf(), "groupcount");
job.setJarByClass(OrderGroupCount.class);
FileInputFormat.setInputPaths(job, new Path(arg0[1]));
FileOutputFormat.setOutputPath(job, new Path(arg0[2]));
job.setMapperClass(Map.class);
job.setSortComparatorClass(Sort.class);
job.setReducerClass(Reduce.class);
job.setOutputFormatClass(TextOutputFormat.class);
//默认即可,若需要进行效率调优使用此代码自定义分片
//设置要分片的calss
//job.setCombinerClass(Reduce.class);
//设置分片calss
//job.setPartitionerClass(SectionPartitioner.class);
//设置分片个数
//job.setNumReduceTasks(3);
job.setOutputKeyClass(DoubleWritable.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return job.isSuccessful()?0:1;
}
public static void main(String[] args) throws Exception {
String[] args2=new String[4];
args2[0]="ss";
args2[1]="hdfs://192.168.1.55:9000/groupcount-in/t_product_sales.txt";
args2[2]="hdfs://192.168.1.55:9000/groupcount-out";
args2[3]="2015-05";
Configuration configuration=new Configuration();
configuration.set("querydate", args2[3]);
//如果第一个程序运行成功执行第二个程序
if(0==ToolRunner.run(configuration, new GruopCount(), args2)){
args2[1]="hdfs://192.168.1.55:9000/groupcount-out/part-r-00000";
args2[2]="hdfs://192.168.1.55:9000/ordergroupcount-out";
ToolRunner.run(configuration, new OrderGroupCount(), args2);
//将中间输出目录删除
FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.1.55:9000"),configuration);
Path path=new Path("/groupcount-out");
if(fs.exists(path))
fs.delete(path,true);
}
}
}
然后是运行日志和结果:
11:54:54,443 INFO Job:1384 - Job job_local470508679_0002 completed successfully
11:54:54,443 DEBUG UserGroupInformation:1652 - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getCounters(Job.java:764)
11:54:54,454 INFO Job:1391 - Counters: 38
File System Counters
FILE: Number of bytes read=903286
FILE: Number of bytes written=1916480
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=973390
HDFS: Number of bytes written=27282
HDFS: Number of read operations=39
HDFS: Number of large read operations=0
HDFS: Number of write operations=16
Map-Reduce Framework
Map input records=404
Map output records=404
Map output bytes=6840
Map output materialized bytes=7654
Input split bytes=117
Combine input records=0
Combine output records=0
Reduce input groups=404
Reduce shuffle bytes=7654
Reduce input records=404
Reduce output records=404
Spilled Records=808
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=671088640
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=9094
File Output Format Counters
Bytes Written=9094
11:54:54,454 DEBUG UserGroupInformation:1652 - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:322)
11:54:54,455 DEBUG UserGroupInformation:1652 - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:322)
11:54:54,455 DEBUG Client:1024 - IPC Client (129566217) connection to /192.168.1.55:9000 from hadoop sending #45
11:54:54,456 DEBUG Client:1081 - IPC Client (129566217) connection to /192.168.1.55:9000 from hadoop got value #45
11:54:54,457 DEBUG ProtobufRpcEngine:253 - Call: getFileInfo took 2ms
11:54:54,457 DEBUG Client:1024 - IPC Client (129566217) connection to /192.168.1.55:9000 from hadoop sending #46
11:54:54,519 DEBUG Client:1081 - IPC Client (129566217) connection to /192.168.1.55:9000 from hadoop got value #46
11:54:54,519 DEBUG ProtobufRpcEngine:253 - Call: delete took 62ms
11:54:54,521 DEBUG Client:97 - stopping client from cache: org.apache.hadoop.ipc.Client@7ff5b38d
11:54:54,521 DEBUG Client:103 - removing client from cache: org.apache.hadoop.ipc.Client@7ff5b38d
11:54:54,521 DEBUG Client:110 - stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@7ff5b38d
11:54:54,521 DEBUG Client:1234 - Stopping client
11:54:54,529 DEBUG Client:1184 - IPC Client (129566217) connection to /192.168.1.55:9000 from hadoop: closed
11:54:54,529 DEBUG Client:979 - IPC Client (129566217) connection to /192.168.1.55:9000 from hadoop: stopped, remaining connections 0
张三67 95750.46
张三15 93783.38
李四90 93780.04
麻子39 90876.53
张三10 90596.34
王二87 90354.13
张三78 89810.42000000001
王二8 89567.97000000002
王二97 88581.10999999999
王二62 88430.04000000001
王二21 88239.09000000001
张三11 87730.02
麻子89 87449.76
王二61 87189.91999999998
麻子72 87002.76000000001
麻子84 86922.65
李四97 86917.75
李四3 86378.59
王二57 86276.37000000001
………………
mysql:
如果要打包,需要修改main方法:
public static void main(String[] args) throws Exception {
// String[] args2=new String[4];
// args2[0]="ss";
// args2[1]="hdfs://192.168.1.55:9000/groupcount-in/t_product_sales.txt";
// args2[2]="hdfs://192.168.1.55:9000/groupcount-out";
// args2[3]="2015-05";
Configuration configuration=new Configuration();
configuration.set("querydate", args[3]);
//如果第一个程序运行成功执行第二个程序
if(0==ToolRunner.run(configuration, new GruopCount(), args)){
args[1]="hdfs://192.168.1.55:9000/groupcount-out/part-r-00000";
args[2]="hdfs://192.168.1.55:9000/ordergroupcount-out";
ToolRunner.run(configuration, new OrderGroupCount(), args);
//将中间输出目录删除
FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.1.55:9000"),configuration);
Path path=new Path("/groupcount-out");
if(fs.exists(path))
fs.delete(path,true);
}
}
执行命令时注意参数位置,正确的执行命令应该是如下(假如打包的jar放在hadoop根目录下的mylib,jar名称为groutcount):
bin/hadoop jar mylib/groupcount.jar gruopcount.OrderGroupCount /groupcount-in/t_product_sales.txt /ordergroupcount-out 2015-05