今天我们讲的第五步,Partition分区操作。
Partition分区:按照一定的分区规则,将key value的list进行分区。分区的创建分为默认的和自定义两种。
public class HashPartitioner<K,V> extends Partitioner<K,V>{
public int getPartition(K key,V value, int numReduceTasks){
return(key.hashCode()& Integer.MAX_VALUE) % numReduceTasks;
}
}
默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。既然用户不能控制存储到某个区间,能自定义么,答案是可以的。
Partitioner
,重写getPartition()
方法public class CustomPartitioner extends Partitioner<Text, FlowBean>{
@Override
public int getPartition(Text key,FlowBean value, int numReduceTasks){
// 控制分区代码逻辑
... ...
return Partition;
}
}
job.setPartitionerClass(CustomPartitioner.class);
job.setNumReduceTasks(5);
假设自定义分区数为5,则
案例继续采用《MapReduce系列(4) | Hadoop序列化》中的文档,有需要文档的可以到此章自行复制保存。
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
package com.buwenbuhuo.partition;
import com.buwenbuhuo.flowsun.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author 卜温不火
* @create 2020-04-23 20:27
* com.buwenbuhuo.partition - the name of the target package where the new class or interface will be created.
* mapreduce0422 - the name of the current project.
*/
public class MyPartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String phone = text.toString();
switch (phone.substring(0, 3)) {
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
case "139":
return 3;
default:
return 4;
}
}
}
package com.buwenbuhuo.partition;
/**
* @author 卜温不火
* @create 2020-04-23 14:14
* com.buwenbuhuo.flowsun - the name of the target package where the new class or interface will be created.
* mapreduce0422 - the name of the current project.
*/
import com.buwenbuhuo.flowsun.FlowBean;
import com.buwenbuhuo.flowsun.FlowDriver;
import com.buwenbuhuo.flowsun.FlowMapper;
import com.buwenbuhuo.flowsun.FlowReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PartitionerDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取job实例
Job job = Job.getInstance(new Configuration());
// 2.设置类路径
job.setJarByClass(PartitionerDriver.class);
// 3 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 8 指定自定义数据分区
job.setPartitionerClass(MyPartitioner.class);
// 9 同时指定相应数量的reduce task
job.setNumReduceTasks(5);
// 4 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path("d:\\input"));
FileOutputFormat.setOutputPath(job, new Path("d:\\output"));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}