已经安装好hadoop 能上传文件到hdfs hadoop版本:2.7
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.7.2</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
需求:有以下数据,对该数据统计每个单词的出现次数
hello word
hello page
123456 789
生如夏花 死如秋叶
mapper
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
@Override
protected void map(LongWritable key, Text value,Mapper<LongWritable,Text,Text,LongWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] datas = line.split(" ");
for (String data : datas) {
context.write(new Text(data),new LongWritable(1));
}
}
}
reduce
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,Reducer<Text,LongWritable,Text,LongWritable>.Context context) throws IOException, InterruptedException {
String data = key.toString();
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
context.write(new Text(data),new LongWritable(count));
}
}
App
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
//wordCount
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//设置输入输出类型
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
需求:有以下数据,对该数据进行去重处理
192.168.234.21
192.168.234.22
192.168.234.21
192.168.234.21
192.168.234.23
192.168.234.21
192.168.234.21
192.168.234.21
192.168.234.25
192.168.234.21
192.168.234.21
192.168.234.26
192.168.234.21
192.168.234.27
192.168.234.21
192.168.234.27
192.168.234.21
192.168.234.29
192.168.234.21
192.168.234.26
192.168.234.21
192.168.234.25
192.168.234.25
192.168.234.21
192.168.234.22
192.168.234.21
mapper
public class DistinctMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable,Text,Text,NullWritable>.Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
resure
public class DistinctReduce extends Reducer<Text,NullWritable,Text,NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text,NullWritable,Text,NullWritable>.Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
App
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReduce.class);
job.setMapOutputValueClass(NullWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
需求:有以下数据,求出他们的平均值
tom 69
tom 84
tom 68
jary 89
jary 90
jary 81
jary 35
rose 23
rose 100
rose 230
mapper
public class AvgMapper extends Mapper<LongWritable, Text,Text, FloatWritable> {
@Override
protected void map(LongWritable key, Text value,Mapper<LongWritable, Text,Text,FloatWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] datas = line.split(" ");
String name = datas[0];
int score = Integer.parseInt(datas[1]);
context.write(new Text(name),new FloatWritable(score));
}
}
reduce
public class AvgReduce extends Reducer<Text, FloatWritable,Text,FloatWritable> {
@Override
protected void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
Integer i = 0;
Float sums = 0f;
for (FloatWritable value : values) {
sums+=value.get();
i+=1;
}
sums=sums/i;
context.write(key,new FloatWritable(sums));
}
}
App
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setMapperClass(AvgMapper.class);
job.setReducerClass(AvgReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FloatWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
需求:假设我们需要处理一批有关天气的数据,其格式如下:
按照ASCII码存储,每行一条记录。每行共24个字符(包含符号在内)
第9、10、11、12字符为年份,第20、21、22、23字符代表温度,求每年的最高温度
2329999919500515070000
9909999919500515120022
9909999919500515180011
9509999919490324120111
6509999919490324180078
9909999919370515070001
9909999919370515120002
9909999919450515180001
6509999919450324120002
8509999919450324180078
mapper
public class MaxMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
String year = str.substring(8,12);
int temp = Integer.parseInt(str.substring(18,22));
context.write(new Text(year),new IntWritable(temp));
}
}
reduce
public class MaxReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int max = 0;
for (IntWritable temp : values) {
if (max<temp.get()){
max = temp.get();
}
}
context.write(key,new IntWritable(max));
}
}
App
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setMapperClass(MaxMapper.class);
job.setReducerClass(MaxReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
需求:有以下数据统计出每个用户的流量使用情况
12321445 zs bj 343
12321312 ww sh 234
12321445 zs bj 343
12321312 ww cd 234
12345678 zs cd 156
新建FlowPojo类
public class FlowPojo implements Writable {
private String name;
private String phone;
private String addr;
private long flow;
public FlowPojo() {
}
public FlowPojo(String name, String phone, String addr, long flow) {
this.name = name;
this.phone = phone;
this.addr = addr;
this.flow = flow;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public long getFlow() {
return flow;
}
public void setFlow(long flow) {
this.flow = flow;
}
@Override
public String toString() {
return "[" +
"name:'" + name + '\'' +
", phone:'" + phone + '\'' +
", addr:'" + addr + '\'' +
", flow:" + flow +
']';
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name);
dataOutput.writeUTF(phone);
dataOutput.writeUTF(addr);
dataOutput.writeLong(flow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.name = dataInput.readUTF();
this.phone = dataInput.readUTF();
this.addr = dataInput.readUTF();
this.flow = dataInput.readLong();
}
}
mapper
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowPojo> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
String[] datas = str.split(" ");
String phone = datas[0];
String name = datas[1];
String addr = datas[2];
Long flow = Long.parseLong(datas[3]);
FlowPojo flowPojo = new FlowPojo(name, phone, addr, flow);
context.write(new Text(phone),flowPojo);
}
}
reduce
public class FlowReduce extends Reducer<Text, FlowPojo,Text,FlowPojo> {
@Override
protected void reduce(Text key, Iterable<FlowPojo> values, Context context) throws IOException, InterruptedException {
FlowPojo flowPojo = new FlowPojo();
long flow = 0l;
for (FlowPojo value : values) {
flow+=value.getFlow();
//判断名字是否赋值
if (value.getName().isEmpty()){
flowPojo.setName(value.getName());
flowPojo.setPhone(value.getPhone());
flowPojo.setAddr(value.getAddr());
}
}
flowPojo.setFlow(flow);
System.out.println(flow);
context.write(key,flowPojo);
}
}
App
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowMapper.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
需求:有以下数据,统计每个月的利润,按月分区
mapper
public class ProfitMapper extends Mapper<LongWritable, Text,Text, FloatWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split(" ");
String month = datas[0];
float profit = Float.parseFloat(datas[1]);
context.write(new Text(month),new FloatWritable(profit));
}
}
reduce
public class ProfitReduce extends Reducer<Text, FloatWritable,Text,FloatWritable> {
@Override
protected void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
float count= 0f;
for (FloatWritable value : values) {
count+=value.get();
}
context.write(key,new FloatWritable(count));
}
}
App
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setNumReduceTasks(3);
job.setMapperClass(ProfitMapper.class);
job.setReducerClass(ProfitReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FloatWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
mapper
public class FlowsMapper extends Mapper<LongWritable, Text,Text, FlowPojo> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split(" ");
FlowPojo flowPojo = new FlowPojo();
flowPojo.setPhone(datas[0]);
flowPojo.setAddr(datas[1]);
flowPojo.setName(datas[2]);
flowPojo.setFlow(Long.parseLong(datas[3]));
context.write(new Text(flowPojo.getName()),flowPojo);
}
}
reduce
public class FlowsReduce extends Reducer<Text, FlowPojo, Text,FlowPojo> {
@Override
protected void reduce(Text key, Iterable<FlowPojo> values, Context context) throws IOException, InterruptedException {
FlowPojo flowPojo = new FlowPojo();
for (FlowPojo value : values) {
flowPojo.setFlow(flowPojo.getFlow()+value.getFlow());
flowPojo.setName(value.getName());
flowPojo.setAddr(value.getAddr());
flowPojo.setPhone(value.getPhone());
}
context.write(key,flowPojo);
}
}
需求:有以下数据,按地区进行分区,做流量统计
13877779999 bj zs 2145
13766668888 sh ls 1028
13766668888 sh ls 9987
13877779999 bj zs 5678
13544445555 sz ww 10577
13877779999 sh zs 2145
13766668888 sh ls 9987
FlowPartition
public class FlowPartition extends Partitioner<Text, FlowPojo> {
@Override
public int getPartition(Text text, FlowPojo flowPojo, int i) {
if (flowPojo.getAddr().equals("bj")){
return 0;
}else if (flowPojo.getAddr().equals("sh")){
return 1;
}else if (flowPojo.getAddr().equals("sz")){
return 2;
}else {
return 3;
}
}
}
mapper
public class FlowsMapper extends Mapper<LongWritable, Text,Text, FlowPojo> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split(" ");
FlowPojo flowPojo = new FlowPojo();
flowPojo.setPhone(datas[0]);
flowPojo.setAddr(datas[1]);
flowPojo.setName(datas[2]);
flowPojo.setFlow(Long.parseLong(datas[3]));
context.write(new Text(flowPojo.getName()),flowPojo);
}
}
reduce
public class FlowsReduce extends Reducer<Text, FlowPojo, Text,FlowPojo> {
@Override
protected void reduce(Text key, Iterable<FlowPojo> values, Context context) throws IOException, InterruptedException {
FlowPojo flowPojo = new FlowPojo();
for (FlowPojo value : values) {
flowPojo.setFlow(flowPojo.getFlow()+value.getFlow());
flowPojo.setName(value.getName());
flowPojo.setAddr(value.getAddr());
flowPojo.setPhone(value.getPhone());
}
context.write(key,flowPojo);
}
}
App
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setPartitionerClass(FlowPartition.class);
job.setNumReduceTasks(4);
job.setMapperClass(FlowsMapper.class);
job.setReducerClass(FlowsReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowPojo.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowPojo.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
需求:有三个成绩文件,chinese.txt,english.txt,math.txt,计算每个人 三个月,每科的总成绩
chinese.txt
1 zhang 89
2 zhang 73
3 zhang 67
1 wang 49
2 wang 83
3 wang 27
english.txt
1 zhang 55
2 zhang 69
3 zhang 75
1 wang 44
2 wang 64
3 wang 86
math.txt
1 zhang 85
2 zhang 59
3 zhang 95
1 wang 74
2 wang 67
3 wang 96
新建StudentPojo
public class StudentPojo implements Writable {
private String name;
private int math;
private int chinese;
private int english;
public StudentPojo() {
}
public StudentPojo(String name, int math, int chinese, int english) {
this.name = name;
this.math = math;
this.chinese = chinese;
this.english = english;
}
@Override
public String toString() {
return "[" +
"name:'" + name + '\'' +
", math:" + math +
", chinese:" + chinese +
", english:" + english +
']';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getMath() {
return math;
}
public void setMath(int math) {
this.math = math;
}
public int getChinese() {
return chinese;
}
public void setChinese(int chinese) {
this.chinese = chinese;
}
public int getEnglish() {
return english;
}
public void setEnglish(int english) {
this.english = english;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name);
dataOutput.writeInt(math);
dataOutput.writeInt(chinese);
dataOutput.writeInt(english);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.name = dataInput.readUTF();
this.math = dataInput.readInt();
this.chinese = dataInput.readInt();
this.english = dataInput.readInt();
}
}
mapper
public class ScoreMapper extends Mapper<LongWritable, Text,Text, StudentPojo> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split(" ");
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String course = fileSplit.getPath().getName();//获取文件名
StudentPojo studentPojo = new StudentPojo();
studentPojo.setName(datas[1]);
if ("math.txt".equals(course)){
studentPojo.setMath(Integer.parseInt(datas[2]));
}
if ("chinese.txt".equals(course)){
studentPojo.setChinese(Integer.parseInt(datas[2]));
}
if ("english.txt".equals(course)){
studentPojo.setEnglish(Integer.parseInt(datas[2]));
}
context.write(new Text(datas[1]),studentPojo);
}
}
reduce
public class ScoreReduce extends Reducer<Text, StudentPojo,Text,StudentPojo> {
@Override
protected void reduce(Text key, Iterable<StudentPojo> values, Context context) throws IOException, InterruptedException {
StudentPojo studentPojo = new StudentPojo();
studentPojo.setName(key.toString());
for (StudentPojo value : values) {
studentPojo.setMath(studentPojo.getMath()+value.getMath());
studentPojo.setChinese(studentPojo.getChinese()+value.getChinese());
studentPojo.setEnglish(studentPojo.getEnglish()+value.getEnglish());
}
context.write(key,studentPojo);
}
}
App
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setMapperClass(ScoreMapper.class);
job.setReducerClass(ScoreReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(StudentPojo.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(StudentPojo.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
需求:有以下数据,根据热度对电影进行降序排序
惊天破 72
机械师2 83
奇异博士 67
但丁密码 79
比利林恩的中场战事 94
侠探杰克:永不回头 68
龙珠Z:复活的弗利萨 79
长城 56
夺路而逃 69
神奇动物在哪里 57
驴得水 68
我不是潘金莲 56
你的名字 77
大闹天竺 96
捉迷藏 78
凶手还未睡 23
魔发精灵 68
勇士之门 35
罗曼蒂克消亡史 67
小明和他的小伙伴们 36
MoviePojo
public class MoviePojo implements WritableComparable<MoviePojo> {
private String name;
private int hot;
public MoviePojo() {
}
public MoviePojo(String name, int hot) {
this.name = name;
this.hot = hot;
}
@Override
public String toString() {
return "name:'" + name + '\'' +
", hot:" + hot ;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getHot() {
return hot;
}
public void setHot(int hot) {
this.hot = hot;
}
@Override
public int compareTo(MoviePojo o) {
return o.getHot()-this.hot;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.name);
dataOutput.writeInt(this.hot);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.name = dataInput.readUTF();
this.hot = dataInput.readInt();
}
}
mapper
public class MovieMapper extends Mapper<LongWritable, Text,MoviePojo, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split(" ");
MoviePojo moviePojo = new MoviePojo();
moviePojo.setName(datas[0]);
moviePojo.setHot(Integer.parseInt(datas[1]));
context.write(moviePojo,NullWritable.get());
}
}
App
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setMapperClass(MovieMapper.class);
job.setOutputKeyClass(MoviePojo.class);
job.setOutputValueClass(NullWritable.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}
需求:有这样一组数字,要求利用3个reduce来处理,并且生成的三个结果文件,是整体有序的。
将1到100装入第一个分区,100到1000装入第二个分区,大于1000的装入第三个分区
82 239 231
23 22 213
123 232 124
213 3434 232
4546 565 123
231 231
2334 231
1123 5656 657
12313 4324 213
123 2 232 32
343 123 4535
12321 3442 453
1233 342 453
1231 322 452
232 343 455
3123 3434 3242
TotalsortPartition
public class TotalsortPartition extends Partitioner<LongWritable, IntWritable> {
@Override
public int getPartition(LongWritable longWritable, IntWritable intWritable, int i) {
Long val = longWritable.get();
if (val<100){
return 0;
}else if(val>=100 && val<1000){
return 1;
}else {
return 2;
}
}
}
mapper
public class TotalsortMapper extends Mapper<LongWritable, Text,LongWritable, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split(" ");
for (String data : datas) {
context.write(new LongWritable(Long.valueOf(data)),new IntWritable(1));
}
}
}
reduce
public class TotalsortReduce extends Reducer<LongWritable, IntWritable,LongWritable,IntWritable> {
@Override
protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count+=value.get();
}
context.write(key,new IntWritable(count));
}
}
App
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置启动类
job.setJarByClass(App.class);
job.setPartitionerClass(TotalsortPartition.class);
job.setNumReduceTasks(3);
job.setMapperClass(TotalsortMapper.class);
job.setReducerClass(TotalsortReduce.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(IntWritable.class);
//设置读取输入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.19.4:9000/count"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.19.4:9000/count/result"));
job.waitForCompletion(true);
}