专栏首页BigData_Flinkspark中distinct是如何实现的?

spark中distinct是如何实现的?

A1 总述:

去重

A2 思路:

map -> resuceByKey -> map

A3 源码:

3.1 有参:

 /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  }
//numPartitions:分区数

3.2 无参:

/**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = withScope {
    distinct(partitions.length)
  }
//partitions.length:分区数

3.3 解释

我们从源码中可以看到,distinct去重主要实现逻辑是

 map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

这个过程是,先通过map映射每个元素和null,然后通过key(此时是元素)统计{reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。},最后再同过map把去重后的元素挑出来。

A4 测试代码

import org.apache.spark.{SparkConf, SparkContext}
object TransformationsFun {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("transformation_operator")
    val sc = new SparkContext(conf)
    //这里的3是初设定的partition数
    val rdd = sc.parallelize(List(1, 2, 3, 3, 3, 3, 8, 8, 4, 9), 3)
    //因为distinct实现用reduceByKey故其可以重设定partition数,这里设定4
    rdd.distinct(4).foreach(println)
    //这里执行时,每次结果不同,分区在4以内,每个分区处理的元素也不定
    sc.stop()
  }
}

图解:

解释:这里仅供理解,在实际运行中,分区会随机使用以及每个分区处理的元素也随机,所以每次运行结果会不同。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • HyperLogLog函数在Spark中的高级应用

    预聚合是高性能分析中的常用技术,例如,每小时100亿条的网站访问数据可以通过对常用的查询纬度进行聚合,被降低到1000万条访问统计,这样就能降低1000倍的数据...

    王知无-import_bigdata
  • Java中Httpsession是如何实现的?

      HTTP协议(http://www.w3.org/Protocols/)是“一次性单向”协议。 服务端不能主动连接客户端,只能被动等待并答复客户端请求。客...

    I Tech You_我教你
  • JS中setTimeout是如何实现的

    我们知道 Javascript引擎是单线程的,而setTimeout方法的作用是延后执行目标代码,同时还可以继续往下执行 setTimeout是如何实现的? 这...

    dys
  • Spark SQL 性能优化再进一步 CBO 基于代价的优化

    上文Spark SQL 内部原理中介绍的 Optimizer 属于 RBO,实现简单有效。它属于 LogicalPlan 的优化,所有优化均基于 Logical...

    Jason Guo
  • Spark SQL 性能优化再进一步 CBO 基于代价的优化

    上文Spark SQL 内部原理中介绍的 Optimizer 属于 RBO,实现简单有效。它属于 LogicalPlan 的优化,所有优化均基于 Logical...

    Jason Guo
  • OpenStack中的RESTful API是如何实现的?

    OpenStack作为一个开源的IaaS平台,各个组件和服务之间的消息传递都是通过RESTfulAPI和RPC传递,这里主要讲讲它是如何实现REST的。由于大家...

    Python中文社区
  • MySQL 是如何实现 ACID 中的 D 的?

    明显不会,磁盘IO太慢了,如果每个请求过来 MySQL 都要写磁盘,磁盘肯定扛不住。

    Java_老男孩
  • 【Java】大文本字符串滤重的简单方案

    一枝花算不算浪漫
  • SQL、Pandas和Spark:常用数据查询操作对比

    当今信息时代,数据堪称是最宝贵的资源。沿承系列文章,本文对SQL、Pandas和Spark这3个常用的数据处理工具进行对比,主要围绕数据查询的主要操作展开。

    luanhz

扫码关注云+社区

领取腾讯云代金券