前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RDD的join和Dstream的join有什么区别?

RDD的join和Dstream的join有什么区别?

作者头像
Spark学习技巧
发布2018-11-22 15:42:32
1.3K0
发布2018-11-22 15:42:32
举报
文章被收录于专栏:Spark学习技巧

有人在知识星球里问:

浪院长,RDD的join和Dstream的join有什么区别?

浪尖的回答:

DStream的join底层就是rdd的join。

下面,我们就带着疑问去验证以下,我们的想法。

2. DStream -> PairDStreamFunctions

Dstream这个类实际上支持的只是Spark Streaming的基础操作算子,比如: map, filterwindow.PairDStreamFunctions 这个支持key-valued类型的流数据

,支持的操作算子,如,groupByKeyAndWindow,join。这些操作,在有key-value类型的流上是自动识别的。

对于dstream -> PairDStreamFunctions自动转换的过程大家肯定想到的是scala的隐式转换。具体代码在Dstream的object内部。

代码语言:javascript
复制
implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
    PairDStreamFunctions[K, V] = {
    new PairDStreamFunctions[K, V](stream)
  }

假如,你对scala的隐式转换比较懵逼,请阅读下面文章。

Scala语法基础之隐式转换

3. PairDStreamFunctions的join

PairDStreamFunctions的join API总共有三种

代码语言:javascript
复制
/**
   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
    *
    *  通过join this和other Dstream的rdd构建出一个新的DStream.
    *  Hash分区器,用来使用默认的分区数来产生RDDs。
   */
  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope {
    join[W](other, defaultPartitioner())
  }

  /**
   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
   * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
    *
    *  通过join this和other Dstream的rdd构建出一个新的DStream.
    *  Hash分区器,用来使用numPartitions分区数来产生RDDs。
   */
  def join[W: ClassTag](
      other: DStream[(K, W)],
      numPartitions: Int): DStream[(K, (V, W))] = ssc.withScope {
    join[W](other, defaultPartitioner(numPartitions))
  }

  /**
   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
   * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    * 通过join this和other Dstream的rdd构建出一个新的DStream.
    * 使用org.apache.spark.Partitioner来控制每个RDD的分区。
   */
  def join[W: ClassTag](
      other: DStream[(K, W)],
      partitioner: Partitioner
    ): DStream[(K, (V, W))] = ssc.withScope {
    self.transformWith(
      other,
      (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
    )
  }

上面所示代码中,第三个PairDStreamFunctions的join api 体现了join的操作,也即是函数:

代码语言:javascript
复制
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)

上面是两个RDD的join过程,并且指定了分区器。后面我们主要是关注该函数封装及调用。

其实,看过浪尖的Spark Streaming的视频的朋友或者度过浪尖关于Spark Streaming相关源码讲解的朋友应该有所了解的是。 这个生成RDD的函数应该是在 DStream的compute方法中在生成RDD的时候调用。假设你不了解也不要紧。 我们跟着代码轨迹前进,验证我们的想法。

DStream.transformWith

代码语言:javascript
复制
/**
   * Return a new DStream in which each RDD is generated by applying a function
   * on each RDD of 'this' DStream and 'other' DStream.
   */
  def transformWith[U: ClassTag, V: ClassTag](
      other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
    ): DStream[V] = ssc.withScope {
    // because the DStream is reachable from the outer object here, and because
    // DStreams can't be serialized with closures, we can't proactively check
    // it for serializability and so we pass the optional false to SparkContext.clean
    val cleanedF = ssc.sparkContext.clean(transformFunc, false)
    transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
  }
  
    /**
     * Return a new DStream in which each RDD is generated by applying a function
     * on each RDD of 'this' DStream and 'other' DStream.
     */
    def transformWith[U: ClassTag, V: ClassTag](
        other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
      ): DStream[V] = ssc.withScope {
      // because the DStream is reachable from the outer object here, and because
      // DStreams can't be serialized with closures, we can't proactively check
      // it for serializability and so we pass the optional false to SparkContext.clean
      val cleanedF = ssc.sparkContext.clean(transformFunc, false)
      val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
        assert(rdds.length == 2)
        val rdd1 = rdds(0).asInstanceOf[RDD[T]]
        val rdd2 = rdds(1).asInstanceOf[RDD[U]]
        cleanedF(rdd1, rdd2, time)
      }
      new TransformedDStream[V](Seq(this, other), realTransformFunc)
    }

经过上面两个 TransformWith操作,最终生成了一个TransformedDStream。需要关注的是new TransformedDStream[V](Seq(this, other), realTransformFunc) 第一个参数是一个包含要进行join操作的两个流的Seq。

那么,TransformedDStream 的parents 就包含了两个流。我们可以看到其 compute 方法的第一行。

代码语言:javascript
复制
override def compute(validTime: Time): Option[RDD[U]] = {
//    针对每一个流,获取其当前时间的RDD。
    val parentRDDs = parents.map { parent => parent.getOrCompute(validTime).getOrElse(
      // Guard out against parent DStream that return None instead of Some(rdd) to avoid NPE
      throw new SparkException(s"Couldn't generate RDD from parent at time $validTime"))
    }

compute的第一行就是获取parent中每个流,当前有效时间的RDD。然后调用,前面步骤封装的函数进行join。

代码语言:javascript
复制
val transformedRDD = transformFunc(parentRDDs, validTime)

以上就是join的全部过程。也是,验证浪尖所说的,DStream的join底层就是RDD的join。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-10-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2. DStream -> PairDStreamFunctions
  • 3. PairDStreamFunctions的join
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档