前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >键值对RDD数据分区

键值对RDD数据分区

作者头像
用户1483438
发布2022-05-06 15:29:06
2.2K0
发布2022-05-06 15:29:06
举报
文章被收录于专栏:大数据共享大数据共享

前言

Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。

注意:

  1. 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
  2. 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

与分区器相关的算子

这些算子都必须为k-v类型,并且可以指定分区器。

  • partitionBy():按照K重新分区 函数签名:def partitionBy(partitioner: Partitioner): RDD[(K, V)]
  • reduceByKey():按照K聚合V 函数签名:def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
  • groupByKey():按照K重新分组 函数签名:def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
  • reduceByKey():按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。 函数签名:def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
  • aggregateByKey()():按照K处理分区内和分区间逻辑 函数签名:def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
  • foldByKey()():作用和reduceBykey一样,但是可以指定一个默认值 函数签名:def foldByKey(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
  • combineByKey():转换结构后分区内和分区间操作 函数签名:def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null): RDD[(K, C)]

RDD中分区器

RDD中的分区器都是Partitioner的之类

代码语言:javascript
复制
abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

默认有三个分区器+自定义分区

  1. HashPartitioner
  2. RangePartitioner
  3. PythonPartitioner(由spark内部使用,我们无法使用)
  4. 自定义分区器,继承Partitioner抽象类,自己实现分区。

所以主要了解HashPartitioner分区器,RangePartitioner分区器及自定义分区器。

Hash分区

HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key的所属的分区ID。

HashPartitioner分区器源码

代码语言:javascript
复制
// partitions 表示分区个数,是由用户指定的。
class HashPartitioner(partitions: Int) extends Partitioner { 
  // 进行断言,分区数不能小于等于0
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  // 绑定分区数
  def numPartitions: Int = partitions
  //对key进行计算,获取分区
  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }
  // 这个不用管
  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

HashPartitioner 计算分区的逻辑

代码语言:javascript
复制
  // 对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0)
  def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }

案例演示:

代码语言:javascript
复制
  @Test
  def hashPartitionerTest(): Unit ={
    val conf=new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    val list=List("张三"->18,"李四"->19,"王五"->20,"赵六"->21,"村长"->22,"福来"->23,"钱多多"->24,"房栋栋"->25)

    val rdd1: RDD[(String, Int)] = sc.parallelize(list, 4)

    val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey()
    value.mapPartitionsWithIndex((index,it)=>{
      println(index,it.toList)
      it
    }).collect

  }

RDD默认分区就是HashPartitioner,分区就是调用者(RDD)的分区数

代码语言:javascript
复制
(3,List((房栋栋,CompactBuffer(25))))
(1,List((张三,CompactBuffer(18)), (李四,CompactBuffer(19)), (钱多多,CompactBuffer(24)), (王五,CompactBuffer(20))))
(2,List((村长,CompactBuffer(22)), (福来,CompactBuffer(23))))
(0,List((赵六,CompactBuffer(21))))

也可以明确指定分区器(new HashPartitioner(partitions)) partitions=分区个数

代码语言:javascript
复制
  @Test
  def hashPartitionerTest(): Unit ={
    val conf=new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    val list=List("张三"->18,"李四"->19,"王五"->20,"赵六"->21,"村长"->22,"福来"->23,"钱多多"->24,"房栋栋"->25)

    val rdd1: RDD[(String, Int)] = sc.parallelize(list, 4)

    val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey(new HashPartitioner(4))
    value.mapPartitionsWithIndex((index,it)=>{
      println(index,it.toList)
      it
    }).collect

  }

指定4个分区

代码语言:javascript
复制
(0,List((赵六,CompactBuffer(21))))
(1,List((张三,CompactBuffer(18)), (李四,CompactBuffer(19)), (钱多多,CompactBuffer(24)), (王五,CompactBuffer(20))))
(2,List((村长,CompactBuffer(22)), (福来,CompactBuffer(23))))
(3,List((房栋栋,CompactBuffer(25))))

Ranger分区

RangePartitionz作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中的数据量均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的映射到某一个分区内。

实现过程为:

  1. 先从整个RDD中采用水塘抽样算法,抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[Key]类型的数组变量rangeBounds;
  2. 判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的

 抽样范围计算
抽样范围计算

RangePartitioner 参数列表

代码语言:javascript
复制
class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int, # 分区个数
    rdd: RDD[_ <: Product2[K, V]], # 指定对按个RDD 进行抽样
    private var ascending: Boolean = true, 指定排序规则(默认为升序,分区间是有序的,分区内不一定有序)
    val samplePointsPerPartitionHint: Int = 20)
  extends Partitioner {...}

rangeBounds 中决定了抽样范围

代码语言:javascript
复制
private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      // Cast to double to avoid overflowing ints or longs
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        // 如果分区不均衡,重新采样
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
         // 计算种子
          val seed = byteswap32(-rdd.id - 1)
          // 调用 sample 进行采样。
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
      }
    }
  }

具体的采样实现

代码语言:javascript
复制
  def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      // 计算种子
      val seed = byteswap32(idx ^ (shift << 16))
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect() // 会再次运算job任务
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }

getPartition 会使用到rangeBounds,计算key获取对应分区。

代码语言:javascript
复制
 def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // Determine which binary search method to use only once.
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition-1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

对这一块源码,还不是很清楚,暂时记录一下。

案例演示:

代码语言:javascript
复制
  @Test
  def rangePartitionerTest(): Unit ={
    val conf=new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 创建本地集合RDD
    val list=List("1"->18,"2"->19,"3"->20,"4"->21,"5"->22,"6"->23,"7"->24,"8"->25)
    val rdd1: RDD[(String, Int)] = sc.parallelize(list, 4)

    //对数据进行分区,并使用RangePartitioner分区器,
    val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey(new RangePartitioner(4, rdd1))

    value.mapPartitionsWithIndex((index,it)=>{
      println(index,it.toList)
      it
    }).collect

    value.foreach(e=>{
      println(e._1,e._2.toList)
    })

  }

各个分区数据

代码语言:javascript
复制
(0,List((2,CompactBuffer(19)), (1,CompactBuffer(18))))
(2,List((5,CompactBuffer(22)), (6,CompactBuffer(23))))
(3,List((8,CompactBuffer(25)), (7,CompactBuffer(24))))
(1,List((4,CompactBuffer(21)), (3,CompactBuffer(20))))

Hash 与 Range的区别

  1. hash 是通过对key取hashcode%分区数(如果小于0就加上分区数,否则+0)的方式指定分区;Range是通过对RDD进行抽样,指定一个区间。然后计算key,确认key具体在那个区间中。
  2. hash 只是单纯的对key进行运算,不会重新运算job任务,range需要对分区进行抽样,需要运行一个job任务。
  3. RDD默认为HashPartitioner 分区器,即使不指定分区器默认的就是。Ragen需要明确指定。

自定义分区

上面说过,我们能使用spark 分区器的就有两种,HashPartitioner和RangePartitioner;很多时候根据业务的需求,需要自定义分区。如下数据: 需求要求 a,b,c华为一个分区,d,e,f换分为一个分区,剩下的分为一个分区。

代码语言:javascript
复制
 val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)

依样画葫芦 我们也许不知道怎么自定义一个分区,那么可以看看spark 自带的是怎么写的;如HashPartitioner

代码语言:javascript
复制
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

查看 HashPartitioner的父类(Partitioner)

代码语言:javascript
复制
abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

总结:

  • 继承 Partitioner类,它是一个抽象类。
  • 实现父类的numPartitions函数
  • 实现父类的getPartition 函数

自定义分区器

代码语言:javascript
复制
/**
 * 自定义分区器
 * partitions 默认为3
 * @param partitions
 */
class CustomPartitioner(partitions: Int) extends Partitioner{
  override def numPartitions: Int = partitions

  override def getPartition(key: Any): Int = key match {
    case "a"|"b"|"c" =>1
    case "d"|"e"|"f" =>2
    case _=>0
  }
}

使用自定义分区器

代码语言:javascript
复制
 @Test
  def partitionByTest(): Unit ={

    val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
    val rdd=sc.parallelize(list,4)

    val rdd3: RDD[(String, Int)] = rdd.partitionBy(new CustomPartitioner(3))

    rdd3.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

  }

结果

代码语言:javascript
复制
0=List((g,100), (h,100), (i,100), (j,100))
1=List((a,100), (b,100), (c,100))
2=List((d,100), (e,100), (f,100))

注意:若出现这种序列化问题

代码语言:javascript
复制
org.apache.spark.SparkException: Task not serializable

解决方式:

  1. CustomPartitioner 重新定义class文件创建
  2. 不要再 classobject 中创建(如下)
代码语言:javascript
复制
class Test{
 class CustomPartitioner(partitions: Int) extends Partitioner{
...
 }
}

抽离出class,在外面定义

代码语言:javascript
复制
class Test{
...
}
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
  1. 若在一个class文件中创建,请使外部实现Serializable接口
代码语言:javascript
复制
class Test extends Serializable {
 class CustomPartitioner(partitions: Int) extends Partitioner{
...
 }
}
  1. 实现Serializable接口之后,出现部分属性无法序列化,可以使用 @transient 注解忽略。
代码语言:javascript
复制
class Test extends Serializable {
 @transient
 val name="a"
 class CustomPartitioner(partitions: Int) extends Partitioner{
 ...
 }
}

该问题的原因:

Driver最终会将Task交给Executor进行执行,其中就需要进行将对象进行序列化,由于CustomPartitioner类在另一个class内部中,序列化CustomPartitioner就需要将外部类先进性序列化。而外部类并没有进行序列化,所以就报了这样的错。


数据倾斜

无论是HashPartitioner还是RangePartitioner都可能会有数据倾斜的问题产生,但是需要注意的是,出现数据倾斜是数据的原因,而不是分区器的原因,是需要单独处理的。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 与分区器相关的算子
  • RDD中分区器
  • Hash分区
  • Ranger分区
  • Hash 与 Range的区别
  • 自定义分区
  • 数据倾斜
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档