专栏首页波波烤鸭MapReduce之分区器(Partitioner)

MapReduce之分区器(Partitioner)

Partitactioner

  Partitioner 组件可以对 MapTask后的数据按Key进行分区,从而将不同分区的Key交由不同的Reduce处理。这个也是我们经常会用到的功能。

1.使用场景

  比如上个案例中我们统计出来了每个用户的流量数据,那么我们接下来想把统计的用户数据根据不同的手机号输出到不同的文件中,那么这时使用分区器就非常合适了。

2.HashPartitioner

  在一般的 MapReduce 过程中,我们知道可以通过 job.setNumReduceTasks(N) 来创建多个 ReducerTask 进行处理任务。可是这种情况下,系统会调用默认的Partitioner也就是 HashPartitioner来对Map的 key 进行分区。进入 Hadoop 的源码,可以看到 HashPartitioner 的实现其实很简单。如下:

public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

  public void configure(JobConf job) {}

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    // key的hash值与integer的最大值取与然后对ReduceTask的个数取余
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

  hash的好处是可以很key的分布更加随机,但是这样会将一些不同的key放在同一个分区中,这并不是我们所期望的。

3.自定义Partitioner

  面对HashPartitioner所具有的局限,我们可以通过自定义Partitioner来解决,如下:

3.1 实现自定义分区器

import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 自定义分区器
 * @author 波波烤鸭
 *
 */
public class CustomPartitioner extends Partitioner< Text, Flow>{
	
	private static Map<String, Integer> map = new HashMap<>();
	
	// 此处我们将数据写死,实际开发中我们应该从对应的数据源中获取数据然后存储在缓存中(Redis)
	static{
		map.put("138", 0);
		map.put("139", 1);
		map.put("158", 2);
		map.put("159", 3);
	}

	/**
	 * 根据key获取对应的分区号
	 * @param key 就是用的手机号码
	 * @param value 统计的用户的信息
	 */
	@Override
	public int getPartition(Text key, Flow value, int numPartitions) {
		// 获取手机号码的前3位 138
		String prefix = key.toString().substring(0, 3);
		return map.containsKey(prefix)?map.get(prefix):4;
	}
}

3.2 启动类设置

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration(true);
		conf.set("mapreduce.framework.name", "local");
		// 输出到HDFS文件系统中
		// conf.set("fs.defaultFS", "hdfs://hadoop-node01:9000");
		// 输出到本地文件系统
		conf.set("fs.defaultFS", "file:///");
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(FlowTest.class);
		
		// 设置ReduceTask的个数
		job.setNumReduceTasks(5);
		// 设置自定义的分区器
		job.setPartitionerClass(CustomPartitioner.class);
		
		// 指定本job要使用的map/reduce的工具类
		job.setMapperClass(MyMapTask.class);
		job.setReducerClass(MyReduceTask.class);
		
		// 指定mapper输出kv的类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Flow.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Flow.class);
		
		// 指定job的原始文件输入目录
		// 6.设置输出输出类
		FileInputFormat.setInputPaths(job, new Path("c:/tools/bigdata/mr/flow/input/"));
		FileOutputFormat.setOutputPath(job, new Path("c:/tools/bigdata/mr/flow/output/"));
				
		//将job中配置的相关参数,以及job所用的jar包提交给yarn运行
		//job.submit();  waitForCompletion等待执行完成
		boolean flag = job.waitForCompletion(true);
		System.exit(flag?0:1);

	}

  MapTask和ReduceTask的代码内容不需要改变,可以参考上篇内容。

Ok ~ partitioner的作用就是用来对Map之后的数据做分区处理操作!

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Hadoop之MapReduce04【客户端源码分析】

      Configuration 用来存储相关的配置文件。在该类中有一段static代码块

    用户4919348
  • Maven教程2(Eclipse配置及maven项目)

      此处报错的原因是jdk版本问题,我们使用的maven的3.6.0jdk必须是1.7+当前使用的是1.5.所以我们需要修改jdk的版本,解决方式有两种。

    用户4919348
  • 好好编程-物流项目13【登录认证-shiro实现】

      我们已经完成了用户的CRUD操作。本文我们来介绍下基于Shiro的登录认证操作。

    用户4919348
  • Lua table之弱引用

    Lua采用了基于垃圾收集的内存管理机制,因此对于程序员来说,在很多时候内存问题都将不再困扰他们。然而任何垃圾收集器都不是万能的,在有些特殊情况下,垃圾收集器是无...

    晚晴幽草轩轩主
  • 小白博客 反弹shell 在公网服务器执行 nc –lvv 8888

    Lua采用了基于垃圾收集的内存管理机制,因此对于程序员来说,在很多时候内存问题都将不再困扰他们。然而任何垃圾收集器都不是万能的,在有些特殊情况下,垃圾收集器是...

    奶糖味的代言
  • Cheerio,服务端的JQuery。

    cheerio 是nodejs特别为服务端定制的,能够快速灵活的对JQuery核心进行实现。它工作于DOM模型上,且解析、操作、呈送都很高效。

    笔阁
  • 1056 组合数的和 (15 分)

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    韩旭051
  • 3000类目标检测--R-FCN-3000 at 30fps: Decoupling Detection and Classification

    R-FCN-3000 at 30fps: Decoupling Detection and Classification Code will be mad...

    用户1148525
  • Hive之导出文件按逗号分隔到本地文件

        如下所示,默认导出的是用\t分隔的,需要使用管道符进行转换,经常使用到,记录下.

    克虏伯
  • kubernetes v1.11 二进制部署(二)之Openssl自签TLS证书

    节点kubelet的公钥与私钥:是通过boostrap响应的方式,在启动kubelet自动会产生, 然后在master通过csr请求,就会产生。 那么知道这些...

    Devops海洋的渔夫

扫码关注云+社区

领取腾讯云代金券