前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark中distinct是如何实现的?

spark中distinct是如何实现的?

作者头像
CoreDao
发布2021-04-13 10:42:51
1.4K0
发布2021-04-13 10:42:51
举报
文章被收录于专栏:BigData_FlinkBigData_Flink

A1 总述:

去重

A2 思路:

map -> resuceByKey -> map

A3 源码:

3.1 有参:
代码语言:javascript
复制
 /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  }
//numPartitions:分区数
3.2 无参:
代码语言:javascript
复制
/**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = withScope {
    distinct(partitions.length)
  }
//partitions.length:分区数
3.3 解释

我们从源码中可以看到,distinct去重主要实现逻辑是

代码语言:javascript
复制
 map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

这个过程是,先通过map映射每个元素和null,然后通过key(此时是元素)统计{reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。},最后再同过map把去重后的元素挑出来。

A4 测试代码

代码语言:javascript
复制
import org.apache.spark.{SparkConf, SparkContext}
object TransformationsFun {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("transformation_operator")
    val sc = new SparkContext(conf)
    //这里的3是初设定的partition数
    val rdd = sc.parallelize(List(1, 2, 3, 3, 3, 3, 8, 8, 4, 9), 3)
    //因为distinct实现用reduceByKey故其可以重设定partition数,这里设定4
    rdd.distinct(4).foreach(println)
    //这里执行时,每次结果不同,分区在4以内,每个分区处理的元素也不定
    sc.stop()
  }
}

图解:

在这里插入图片描述
在这里插入图片描述

解释:这里仅供理解,在实际运行中,分区会随机使用以及每个分区处理的元素也随机,所以每次运行结果会不同。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-03-09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • A1 总述:
  • A2 思路:
  • A3 源码:
    • 3.1 有参:
      • 3.2 无参:
        • 3.3 解释
        • A4 测试代码
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档