前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >hashpartitioner-Spark分区计算器

hashpartitioner-Spark分区计算器

作者头像
Spark学习技巧
发布2018-03-20 14:48:44
1K0
发布2018-03-20 14:48:44
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

一点点回忆

年初了,帮助大家回忆一下spark的重要知识点。

首先,我们回顾的知识点是RDD的五大特性:

1,一系列的分区。

2,一个函数作用于分区上。

3,RDD之间有一系列的依赖。

4,分区器。

5,最佳位置。

Spark属于链式计算,rdd之间有着依赖关系:窄依赖,宽依赖。

RDD执行的时候会将计算链条分为很多task,rdd的task分为:ResultTask和ShuffleMapTask。

Partitioner简介

书归正传,RDD之间的依赖如果是宽依赖,那么上游RDD该如何确定每个分区的输出将交由下游RDD的哪些分区呢?Spark提供了分区计算器来解决这个问题。ShuffleDependency的partitioner属性的类型是partitioner,抽象类Partitioner定义了分区计算器的接口规范,ShuffleDependency的分区取决于Partitioner的具体实现。Partitioner的定义如下:

代码语言:javascript
复制
abstract class Partitioner extends Serializable {
 def numPartitions: Int
 def getPartition(key: Any): Int
}

Partitioner的numPartitions方法用于获取分区数量。Partitioner的getPartition方法用于将输入的key映射到下游的RDD的从0到numPartitions-1这个范围中的某一个分区中去。

Partitioner根据不同的需求有着具体的实现类,在idea打开源码,在该抽象类上按下F4键,可以看到继承关系,如下图:

本分区系列,会将CoalescedPartitioner,GridPartitioner,HashPartitioner,RangePartitioner及自定义分区器逐个介绍。本文重点在hashPartitioner。

HashPartitioner

首先,我们先看HashPartitioner的源码实现。

代码语言:javascript
复制
class HashPartitioner(partitions: Int) extends Partitioner {
 require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

 def numPartitions: Int = partitions

 def getPartition(key: Any): Int = key match {
 case null => 0
 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

 override def equals(other: Any): Boolean = other match {
 case h: HashPartitioner =>
      h.numPartitions == numPartitions
 case _ =>
 false
 }

 override def hashCode: Int = numPartitions
}

根据上面代码我们可以看到,其传入的参数partitions,决定总的分区数,重写的numPartitions方法也只是简单返回该值。重写的getPartition方法实际上是以key的hashcode和numPartitions作为参数调用了Utils工具类的nonNegativeMod方法,该方法的具体实现如下:

代码语言:javascript
复制
def nonNegativeMod(x: Int, mod: Int): Int = {
 val rawMod = x % mod
  rawMod + (if (rawMod < 0) mod else 0)
}

nonNegativeMod方法将对key的hashCode和numPartitions进行取模运算,得到key对应的分区索引。使用哈希和取模的方式,可以方便地计算出下游RDD的各个分区将具体处理哪些key。由于上游RDD所处理的key的哈希值在取模后很可能产生数据倾斜,所以HashPartitioner并不是一个均衡的分区计算器。

根据HashPartitioner的实现,我们知道ShuffleDependency中的分区依赖关系并不再是一对一的,而取决于key,并且当前RDD的某个分区将可能依赖于ShuffleDependcy的RDD的任何一个分区。我们分析的内容可以作图如下:

用HashPartitioner的RDD算子

举几个常见的使用HashPartitioner的例子。

Reducebykey

代码语言:javascript
复制
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
  reduceByKey(defaultPartitioner(self), func)
}

aggregateByKey

代码语言:javascript
复制
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
 combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
  aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
}

join

代码语言:javascript
复制
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
  join(other, defaultPartitioner(self, other))
}

简单介绍defaultPartitioner

简单看一一下,上面代码片段提到的defaultPartitioner方法,该方法的源码如下:

代码语言:javascript
复制
 def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
 val rdds = (Seq(rdd) ++ others)
 val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
 if (hasPartitioner.nonEmpty) {
      hasPartitioner.maxBy(_.partitions.length).partitioner.get
    } else {
 if (rdd.context.conf.contains("spark.default.parallelism")) {
 new HashPartitioner(rdd.context.defaultParallelism)
      } else {
 new HashPartitioner(rdds.map(_.partitions.length).max)
      }
    }
  }
}

该方法首先,会获取带分区数大于零的RDD,然后假如不为空,就采用分区数最大的RDD的分区器当做得到的分区器返回。

假如,都没有分区器,就会默认给定一个HashPartitioner分区器,前面我们也说到了HashPartitioner分区构建的时候要传入一个分区数的参数。这里获取分区数的方式,首先是判断是否设置了spark.default.parallelism参数,假如有的话,可以对rdd.context.defaultParallelism进行追述,最终假如是集群模式调用的是CoarseGrainedSchedulerBackend的下面方法:

override def defaultParallelism(): Int = {

conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))

}

结果是假如设定了该参数,采用我们的设定值。没设定的话总core数和2取最大值作为分区数。

假如,没有没有spark.default.parallelism配置则其值就是父RDD分区最大的RDD的分区数。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-02-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档