前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Transformation转换算子之Key-Value类型

Transformation转换算子之Key-Value类型

作者头像
用户1483438
发布2022-04-27 14:57:22
6520
发布2022-04-27 14:57:22
举报
文章被收录于专栏:大数据共享

分类:

  1. partitionBy() 按照K重新分区
  2. 自定义分区
  3. reduceByKey()按照K聚合V
  4. groupByKey()按照K重新分组
  5. reduceByKey和groupByKey区别
  6. aggregateByKey()按照K处理分区内和分区间逻辑
  7. foldByKey()分区内和分区间相同的aggregateByKey()
  8. combineByKey()转换结构后分区内和分区间操作

SparkContext

SparkContext 定义成在全局范围,配置如下;

代码语言:javascript
复制
  val conf=new SparkConf().setMaster("local[2]").setAppName("test")
  val sc=new SparkContext(conf)

partitionBy()

将RDD[K,V]中的K按照指定Partitioner重新进行分区; 如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程。

定义一个集合,存放map元素(a-j);默认分区4个

代码语言:javascript
复制
    val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
    val rdd=sc.parallelize(list,4)

查看默认分区情况

代码语言:javascript
复制
    rdd.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

结果: 几乎比较均匀的分配到各个分区中

代码语言:javascript
复制
0=List((a,100), (b,100)) 
1=List((c,100), (d,100), (e,100))
2=List((f,100), (g,100))
3=List((h,100), (i,100), (j,100))

使用 partitionBy 按照key进行分区

partitionBy 源码,需要让指定一个分区器(Partitioner)

代码语言:javascript
复制
 def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
  }

查看spark有那些分区器

  • HashPartitioner:默认的分区器,通过对key进行hash运算,取余分区数的方式计算分区
  • RangePartitioner:
  • PythonPartitioner:spark内部使用的,外部无法使用
  • 自定义分区:开发者能使用的只有HashPartitionerRangePartitioner两种,若都无法满足我们的需求,就只能自定义分区器了。

使用HashPartitioner 作为 partitionBy的分区器

代码语言:javascript
复制
//  HashPartitioner 需要指定一个分区数
val rdd2: RDD[(String, Int)] = rdd.partitionBy(new HashPartitioner(2))
//  查看分区情况
rdd2.mapPartitionsWithIndex((index,it)=>{
  println(s"$index=${it.toList}")
  it
}).collect

结果

代码语言:javascript
复制
1=List((a,100), (c,100), (e,100), (g,100), (i,100))
0=List((b,100), (d,100), (f,100), (h,100), (j,100))

注意:

  1. 包位置需要指定为org.apache.spark.HashPartitioner
  2. 需要指定一个分区数new HashPartitioner(分区数) 完整代码
代码语言:javascript
复制
  @Test
  def partitionByTest(): Unit ={

    val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
    val rdd=sc.parallelize(list,4)


    rdd.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

    val rdd2: RDD[(String, Int)] = rdd.partitionBy(new HashPartitioner(2))

    rdd2.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

  }

源码分析HashPartitioner是如何进行分区的

代码语言:javascript
复制
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  // 将分区数传给numPartitions
  def numPartitions: Int = partitions
  // 这里才是重点;
  def getPartition(key: Any): Int = key match {
    // 通过模式匹配,判断key是否为null,若为null指定到0分区
    case null => 0
    // 获取 key的hashCode ; numPartitions 传入进来的分区数,也就是2
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }
  // 这个不太重要
  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

nonNegativeMod: 真正计算出分区的地方

代码语言:javascript
复制
// x:上面传入的 hashcode 
// mod :上面传入的分区数
 def nonNegativeMod(x: Int, mod: Int): Int = {
    // hashcode取余分区数  ,取余出来的可能会是一个负数
    // 例如:scala> "hadoop".hashCode %2
    // res10: Int = -1
    val rawMod = x % mod
    // 分区数肯定不能为负数,于是他做了这样的判断,if (rawMod < 0)
    // 比如 rawMod =-1 ;就使用 mod + rawMod = 1 ;否则 mod +0;
    // 这样就很好的解决了取余后为分区可能负数的情况了。
    rawMod + (if (rawMod < 0) mod else 0)
  }

自定义分区

上面说过,我们能使用spark 分区器的就有两种,HashPartitioner和RangePartitioner;很多时候根据业务的需求,需要自定义分区。如下数据: 需求要求 a,b,c华为一个分区,d,e,f换分为一个分区,剩下的分为一个分区。

代码语言:javascript
复制
 val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)

依样画葫芦 我们也许不知道怎么自定义一个分区,那么可以看看spark 自带的是怎么写的;如HashPartitioner

代码语言:javascript
复制
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

查看 HashPartitioner的父类(Partitioner)

代码语言:javascript
复制
abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

总结:

  • 继承 Partitioner类,它是一个抽象类。
  • 实现父类的numPartitions函数
  • 实现父类的getPartition 函数

自定义分区器

代码语言:javascript
复制
/**
 * 自定义分区器
 * partitions 默认为3
 * @param partitions
 */
class CustomPartitioner(partitions: Int) extends Partitioner{
  override def numPartitions: Int = partitions

  override def getPartition(key: Any): Int = key match {
    case "a"|"b"|"c" =>1
    case "d"|"e"|"f" =>2
    case _=>0
  }
}

使用自定义分区器

代码语言:javascript
复制
 @Test
  def partitionByTest(): Unit ={

    val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
    val rdd=sc.parallelize(list,4)

    val rdd3: RDD[(String, Int)] = rdd.partitionBy(new CustomPartitioner(3))

    rdd3.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

  }

结果

代码语言:javascript
复制
0=List((g,100), (h,100), (i,100), (j,100))
1=List((a,100), (b,100), (c,100))
2=List((d,100), (e,100), (f,100))

注意:若出现这种序列化问题

代码语言:javascript
复制
org.apache.spark.SparkException: Task not serializable

解决方式:

  1. CustomPartitioner 重新定义class文件创建
  2. 不要再 classobject 中创建(如下)
代码语言:javascript
复制
class Test{
 class CustomPartitioner(partitions: Int) extends Partitioner{
...
 }
}

抽离出class,在外面定义

代码语言:javascript
复制
class Test{
...
}
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
  1. 若在一个class文件中创建,请使外部实现Serializable接口
代码语言:javascript
复制
class Test extends Serializable {
 class CustomPartitioner(partitions: Int) extends Partitioner{
...
 }
}
  1. 实现Serializable接口之后,出现部分属性无法序列化,可以使用 @transient 注解忽略。
代码语言:javascript
复制
class Test extends Serializable {
 @transient
 val name="a"
 class CustomPartitioner(partitions: Int) extends Partitioner{
 ...
 }
}

该问题的原因:

Driver最终会将Task交给Executor进行执行,其中就需要进行将对象进行序列化,由于CustomPartitioner类在另一个class内部中,序列化CustomPartitioner就需要将外部类先进性序列化。而外部类并没有进行序列化,所以就报了这样的错。


reduceByKey()

功能说明:该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。

  • 需求:考试成绩下来了,统计语文,数学,英语各成绩的总成绩
代码语言:javascript
复制
    val rdd1 = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
代码语言:javascript
复制
val result= rdd1.reduceByKey((x, y) => {
  x+y
}).collect

println(result.toList)

结果

代码语言:javascript
复制
List((数学,69), (英语,162), (语文,100))
  • 原理分析 查看分区情况
代码语言:javascript
复制
   rdd1.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

结果

代码语言:javascript
复制
0=List((语文,10), (语文,20), (数学,15), (语文,30), (数学,33))
1=List((英语,12), (语文,40), (数学,21), (英语,50), (英语,100))

原理图:

 reduceByKey执行流程图
reduceByKey执行流程图

上传上去不太清楚的酱紫,我也

说明:

  1. 根据分区大小(这里设置分区数为2),设置将数据写入分布到各个分区中,
代码语言:javascript
复制
0=List((语文,10), (语文,20), (数学,15), (语文,30), (数学,33))
1=List((英语,12), (语文,40), (数学,21), (英语,50), (英语,100))
  1. 程序运行时会将数据写入缓冲区中(MapReduce流程差不多),缓冲区处于内存中,无法无限存入数据,所以会溢写入磁盘中。
  2. 在不影响程序最终结果的情况下使用combiner可以更好的提高效率,在reduceByKey中无论如何都会进行一次combiner(用于提高效率)。

  combiner
combiner

  1. 对数据按照key进行分组,并再次调用 reduce程序代码(如下),对单个组的数据进行聚合运算
代码语言:javascript
复制
 val result2=rdd1.reduceByKeyLocally((x, y) => {x + y})
  1. 计算结果完成后再将数据溢写入磁盘。
  2. rdd2 类似于reduce,他会对分区类的数据再进进行聚合统计

 reduce-rdd2
reduce-rdd2
  1. 最终得到想要的数据结果
代码语言:javascript
复制
List((数学,69), (英语,162), (语文,100))
  • 完整代码
代码语言:javascript
复制
  @Test
  def reduceByKeyTest(): Unit ={
    val rdd1 = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)

    val result= rdd1.reduceByKey((x, y) => {
      x+y
    }).collect

    println(result.toList)

   rdd1.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

    val result2=rdd1.reduceByKeyLocally((x, y) => {
      x + y
    })

    println(result2.toList)
  }
  • 总结:

reduceByKey(func: (RDD Value值类型,RDD value值) => RDD value值): 根据key分组之后,所有该key的value值进行聚合 reduceByKey里面的函数是针对每个组的所有value值操作 reduceByKey 会经过一次shuffle

groupByKey()

groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。 该操作可以指定分区器或者分区数(默认使用HashPartitioner)

groupByKey 有三个重载方法

  • groupByKey(partitioner: Partitioner) 指定一个分区器
  • groupByKey(): 底层实现就是调用了groupByKey(partitioner: Partitioner) 默认的分区器为HashPartitioner 分区器的分区数默认为最开始配置的大小(2)
  • groupByKey(numPartitions: Int) 底层实现也是调用groupByKey(partitioner: Partitioner); 并直接定分区器为HashPartitioner 创建HashPartitioner分区器同时,也为其指定了分区数大小numPartitions

案例:使用默认的无参的groupByKey()

代码语言:javascript
复制
 @Test
  def groupByKeyTest(): Unit ={
    val rdd1 = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
    val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey()
    println(value.collect.toList)
  }

结果:

代码语言:javascript
复制
List((数学,CompactBuffer(15, 33, 21)), (英语,CompactBuffer(12, 50, 100)), (语文,CompactBuffer(10, 20, 30, 40)))

reduceByKey和groupByKey区别

  1. reduceByKey存在combiner行为,性能更高
  2. groupByKey不存在conbiner行为,性能比较低

工作中推荐使用reduceByKey这种高性能shuffle算子

aggregateByKey()

foldByKey()

在scala中也有fold()函数,与reduce()唯一的区别就是,reduce会把第一个列表中第一个元作为参数的默认值,而fold(),可以指定一个默认值,其他操作和fold与reduce没有什么不同。在spark中foldByKey()和reduceBykey()亦是如此。

foldByKey 参数说明

代码语言:javascript
复制
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    foldByKey(zeroValue, defaultPartitioner(self))(func)
}

这种((zeroValue: V)(func: (V, V) => V))语法称为:函数柯里化(zeroValue: V):需要指定一个默认值; (func: (V, V) => V):具体的操作逻辑

案例: 统计各科总成绩,校长心情比较好,决定在总成绩的分数上再加一百分

代码语言:javascript
复制
  @Test
  def foldByKeyTest(): Unit ={
    val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)

   // 使用 foldByKey 数据汇总
    val value: RDD[(String, Int)] = rdd.foldByKey(100)((v1, v2) => {
      v1 + v2
    })

    println(value.collect.toList)

  }

结果:

代码语言:javascript
复制
List((数学,269), (英语,262), (语文,300))

aggregateByKey()

combineByKey()

combineByKey 参数说明:

代码语言:javascript
复制
def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
  }
  • createCombiner(转换数据的结构) combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。 简单说明:在combiner阶段对每个组的第一个vlaue值进行转换
  • mergeValue(分区内) 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。 简单说明:combiner的聚合逻辑
  • mergeCombiners(分区间) 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。 简单说明:reduce的聚合逻辑

案例: 求每门学科的平均成绩 输入如下:

代码语言:javascript
复制
val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)

完整代码如下:

代码语言:javascript
复制
@Test
  def combineByKeyTest(): Unit ={
    val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
    // 求每门学科的平均成绩
    val value: RDD[(String, (Int, Int))] = rdd.combineByKey(v => (v, 1), (c: (Int, Int), v: Int) => (c._1+v, c._2+1), (c1: (Int, Int), c2: (Int, Int)) => (c1._1+c2._1, c1._2+c2._2))

    //获得各科成绩的总分数和个数
    val result=value.collect.toList
    println(result)

    result.foreach(m=>{
      m match {
        case (curr,(totalScore,size)) => println(s"课程:$curr,总分:$totalScore,个数:$size,平均数:${totalScore.toDouble/size}")
      }
    })

结果:

代码语言:javascript
复制
List((数学,(69,3)), (英语,(162,3)), (语文,(100,4)))
课程:数学,总分:69,个数:3,平均数:23.0
课程:英语,总分:162,个数:3,平均数:54.0
课程:语文,总分:100,个数:4,平均数:25.0

代码分析(我觉得挺复杂的,有点太抽象了); 首先我们应该理解的时候,combineByKey各个阶段的操作都是针对于 value 而言,毕竟时BykeycombineByKey 中的C,V其实表示的含义就是传入的value,返回的也是一个value

代码语言:javascript
复制
val value: RDD[(String, (Int, Int))] = rdd.combineByKey(
 v => (v, 1),
 (c: (Int, Int), v: Int) => (c._1+v, c._2+1), (c1: (Int, Int), 
 c2: (Int, Int)) => (c1._1+c2._1, c1._2+c2._2)
)

第一个参数:createCombiner createCombiner: V => C :他希望我们传入进来C的是一个value,就是一个个分数(10,20,15等),返回的V将作为下一个函数参数的C(mergeValue) v => (v, 1) 此时结合我们的业务,统计平均数,我们首先得知道语文有多少个,数学有多少个,英语多少个。如何通过combineByKey来实现呢?结合createCombiner的特性在combiner阶段对每个组的第一个vlaue值进行转换,我们就可以将计算器(用1标识)存放到value中 结果应该是这样的。

代码语言:javascript
复制
"语文"->(10,1),"语文"->(20,1),"数学"->(15,1),"语文"->(30,1),"数学"->(33,1),
"英语"->(12,1),"语文"->(40,1),"数学"->(21,1),"英语"->(50,1),"英语"->(100,1)

由于是在combiner阶段,在combiner阶段对每个组的第一个vlaue值进行转换 语文:

代码语言:javascript
复制
"语文"->(10,1)

数学:

代码语言:javascript
复制
"数学"->(15,1)

英语:

代码语言:javascript
复制
"英语"->(12,1)

然后就是第二个参数mergeValue,他的解释是combiner的聚合逻辑;待会解释是什么意思 先看看mergeValue需要指定哪些参数 : (C, V) => C, C:就是上一个函数参数(createCombiner)返回的结果(如:(10,1),(20,1),(30,1)) V:表示带聚合的元素 返回的C将会作为下一个函数参数的C(mergeCombiners的参数C)。应该能明白了吧。

了解了mergeValue各个参数的意思及返回参数的意思之后,再次回到业务中。 (c: (Int, Int), v: Int) => (c._1+v, c._2+1), (c1: (Int, Int): 拿语文进行举例

代码语言:javascript
复制
"语文"->(10,1)

第一次:c._1 表示成绩也是 元素(10,1),v: 表示 下一个带聚合的元素(20,1+1), 结果就是(30,2) 第二次:c._1 表示上一个结果(30,2),v: 表示 下一个带聚合的元素(30,2+1),结果就是(60,3) 第二次:c._1 表示上一个结果(60,3),v: 表示 下一个带聚合的元素(40,3+1),结果就是(100,4)

最终结果

代码语言:javascript
复制
语文 -> (100,4)
数学 -> (69,3)
英语 -> (162,3)

mergeValue之后,最终溢写到磁盘

mergeCombiners 就比较简单了,就是一个reduce操作。 注意:我上面的方式是建立在一个分区情况下,多个分区也是一样的流程。 mergeCombiners 中就是将多个 分区进行最后的聚合处理。

原理图:

 combineByKey原理图
combineByKey原理图

我也是在学习阶段,理解可能没那么透彻,文章若有什么不对,希望可以指出来。

除了使用combineByKey可以使用reduceByKey的方式实现类似的功能,对比combineByKey还更简单一点。

代码语言:javascript
复制
  @Test
  def combineByKeyTest(): Unit ={
    val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)

    // 通过 map方式实现一样的功能
    val rdd2=rdd.map(x=>{
      (x._1,(x._2,1))
    }).reduceByKey((v1,v2)=>{
      (v1._1+v2._1,v1._2+v2._2)
    })\
    println(rdd2.collect.toList)
 }

结果:

代码语言:javascript
复制
List((数学,(69,3)), (英语,(162,3)), (语文,(100,4)))

其实reduceByKey底层就是使用的是combineByKey combineByKey 底层实现

代码语言:javascript
复制
def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    // 注意这个函数
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
  }

reduceByKey 底层实现

代码语言:javascript
复制
 def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

在往reduceByKey(defaultPartitioner(self), func)中点击

代码语言:javascript
复制
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    // 注意这个函数
   // (v: V) => v ;自己转自己,啥都没干。
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

看到了吗? 都是使用的是combineByKeyWithClassTag来实现。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分类:
  • SparkContext
  • partitionBy()
  • 自定义分区
  • reduceByKey()
  • groupByKey()
  • reduceByKey和groupByKey区别
  • aggregateByKey()
  • foldByKey()
  • aggregateByKey()
  • combineByKey()
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档