前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark源码解析:DStream

Spark源码解析:DStream

作者头像
木东居士
发布2018-05-25 15:38:42
8450
发布2018-05-25 15:38:42
举报

0x00 前言

本篇是Spark源码解析的第二篇,主要通过源码分析Spark Streaming设计中最重要的一个概念——DStream。

本篇主要来分析Spark Streaming中的Dstream,重要性不必多讲,明白了Spark这个几个数据结构,容易对Spark有一个整体的把握。

和RDD那篇文章类似,虽说是分析Dstream,但是整篇文章会围绕着一个具体的例子来展开。算是对Spark Streaming源码的一个概览。

文章结构

  • Spark Streaming的一些概念,主要和Dstream相关
  • Dstream的整体设计
  • 通过一个具体例子深入讲解

0x01 概念

什么是Spark Streaming

Scalable, high-throughput, fault-tolerant stream processing of live data streams!

一个实时系统,或者说是准实时系统。详细不再描述。

提一点就是,Streaming 的任务最后都会转化为Spark任务,由Spark引擎来执行。

Dstream

It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.

RDD 的定义是一个只读、分区的数据集(an RDD is a read-only, partitioned collection of records),而 DStream 又是 RDD 的模板,所以我们把 Dstream 也视同数据集。

我的简单理解,Dstream是在RDD上面又封了一层的数据结构。下面是官网对Dstream描述的图。

Spark Streaming和其它实时处理程序的区别

此处是来自Spark作者的论文,写的很好,我就不翻译了,摘出来我关注的点。

我们把实时处理框架分为两种:Record-at-a-time和D-Stream processing model。

Record-at-a-time:

D-Stream processing model:

两者的区别:

Record-at-a-time processing model. Each node continuously receives records, updates internal state, and sends new records. Fault tolerance is typically achieved through replication, using a synchronization protocol like Flux or DPC to ensure that replicas of each node see records in the same order (e.g., when they have multiple parent nodes). D-Stream processing model. In each time interval, the records that arrive are stored reliably across the cluster to form an immutable, partitioned dataset. This is then processed via deterministic parallel operations to compute other distributed datasets that represent program output or state to pass to the next interval. Each series of datasets forms one D-Stream.

Record-at-a-time的问题:

In a record-at-a-time system, the major recovery challenge is rebuilding the state of a lost, or slow, node.

0x02 源码分析

Dstream

A DStream internally is characterized by a few basic properties:

  • A list of other DStreams that the DStream depends on
  • A time interval at which the DStream generates an RDD
  • A function that is used to generate an RDD after each time interval

Dstream这个数据结构有三块比较重要。

  • 父依赖
  • 生成RDD的时间间隔
  • 一个生成RDD的function

这些对应到代码中的话如下,这些都会有具体的子类来实现,我们在后面的分析中就能看到。 下面先顺着例子一点点讲。

代码语言:javascript
复制
abstract class DStream[T: ClassTag] ( @transient private[streaming] var ssc: StreamingContext ) extends Serializable with Logging {
  /** Time interval after which the DStream generates an RDD */
  def slideDuration: Duration
  /** List of parent DStreams on which this DStream depends on */
  def dependencies: List[DStream[_]]
  /** Method that generates an RDD for the given time */
  def compute(validTime: Time): Option[RDD[T]]
  // RDDs generated, marked as private[streaming] so that testsuites can access it
  @transient
  private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
  // Reference to whole DStream graph
  private[streaming] var graph: DStreamGraph = null
 }

举个栗子

官网最基本的wordcount例子,和Spark的类似。虽简单,但是代表性很强。

代码语言:javascript
复制
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()             // Start the computation
ssc.awaitTermination()  //

这里涉及到了Dstream之间的转换以及RDD的生成。在这里先看一下Dstream的转换。

Dstream依赖关系

Dstream的一些依赖关系还是要先弄明白的,不然不太容易理解。Dstream依赖图很大,我们只列几个这次关注的。

这里不再详细介绍每一个组件,只放一个图,后面在看源码的时候可以回过头再看,会更清晰。

1. 源码分析:StreamingContext

StreamingContext的主要组成,这里我们不再展开讲StreamingContext的作用,我们先讲这个具体的例子,后面会有专门的博客来分析其中一些主要的组件,比如DstreamGraph和JobGenerator。

  • JobScheduler : 用于定期生成Spark Job
  • JobGenerator
  • JobExecutor
  • DstreamGraph:包含Dstream之间依赖关系的容器
  • StreamingJobProgressListener:监听Streaming Job,更新StreamingTab
  • StreamingTab:Streaming Job的标签页
  • SparkUI负责展示
代码语言:javascript
复制
class StreamingContext private[streaming] (
    _sc: SparkContext,
    _cp: Checkpoint,
    _batchDur: Duration
  ) extends Logging {...}

先看第一行代码做了什么,val lines = ssc.socketTextStream("localhost", 9999),看过RDD源码的应该会记得,这一行代码就会做很多Dstream的转换,下面我们慢慢看。

socketTextStream 返回的时一个SocketInputDStream,那么SocketInputDStream是个什么东西?

代码语言:javascript
复制
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

def socketStream[T: ClassTag](
    hostname: String,
    port: Int,
    converter: (InputStream) => Iterator[T],
    storageLevel: StorageLevel
  ): ReceiverInputDStream[T] = {
  new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}

2. 源码分析:SocketInputDStream

这里我们看到SocketInputDStream其实继承了ReceiverInputDStream,这里就出现了第一层的继承关系,可以回头看一下前面的那个图。

它里面没做太多的东西,主要自己写了一个SocketReceiver,其余的主要方法都继承自ReceiverInputDStream。

代码语言:javascript
复制
class SocketInputDStream[T: ClassTag](
    _ssc: StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](_ssc) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}

3. 源码分析:ReceiverInputDStream

ReceiverInputDStream是一个比较重要的类,有很大一部分的Dstream都继承于它。 比如说Kafka的InputDStream。所以说这是一个比较关键的类。

Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] that has to start a receiver on worker nodes to receive external data. Specific implementations of ReceiverInputDStream must define [[getReceiver]] function that gets the receiver object of type [[org.apache.spark.streaming.receiver.Receiver]] that will be sent to the workers to receive data.

注意: 这里重写了一个重要的方法compute。它决定了如何生成RDD。

另外ReceiverInputDStream继承自InputDStream。

代码语言:javascript
复制
abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
  extends InputDStream[T](_ssc) {
  /**
   * Generates RDDs with blocks received by the receiver of this stream. */
  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

4. 源码分析:InputDStream

InputDStream是一个比较重要的抽象,它是所有和Input相关Dstream的抽象类。比如FileInputDStream和我们刚才看的ReceiverInputDStream。

This is the abstract base class for all input streams. This class provides methods start() and stop() which are called by Spark Streaming system to start and stop receiving data, respectively. Input streams that can generate RDDs from new data by running a service/thread only on the driver node (that is, without running a receiver on worker nodes), can be implemented by directly inheriting this InputDStream. For example, FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for new files and generates RDDs with the new files. For implementing input streams that requires running a receiver on the worker nodes, use [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.

代码语言:javascript
复制
abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) extends DStream[T](_ssc) {
override def dependencies: List[DStream[_]] = List()

  override def slideDuration: Duration = {
    if (ssc == null) throw new Exception("ssc is null")
    if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
    ssc.graph.batchDuration
  }
...
}

注意: 到这里,才看完了第一行代码,就是那个读数据的那一行。

5. 源码分析:Dstream.flatMap方法(以及Dstream如何生成RDD)

Dstream前面已经做过了一些介绍,不再赘述,这里开始按照例子的顺序向下讲。

看我们的第一个转换flatMap。返回了个FlatMappedDStream,并传入一个function。

代码语言:javascript
复制
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }

下面转到FlatMappedDStream的分析,里面会设计到如何生存RDD的操作。

代码语言:javascript
复制
class FlatMappedDStream[T: ClassTag, U: ClassTag](
    parent: DStream[T],
    flatMapFunc: T => TraversableOnce[U]
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)
  override def slideDuration: Duration = parent.slideDuration
  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
  }
}

DStream如何生成RDD?

Get the RDD corresponding to the given time; either retrieve it from cache or compute-and-cache it.

DStream 内部用一个类型是 HashMap 的变量 generatedRDD 来记录已经生成过的 RDD。

注意: compute(time)是用来生成rdd的。

代码语言:javascript
复制
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
  / 从 generatedRDDs 里 来取rdd:如果有 rdd 就返回,没有 rdd 就进行 orElse 的代码
  generatedRDDs.get(time).orElse {
    // Compute the RDD if time is valid (e.g. correct time in a sliding window)
    // of RDD generation, else generate nothing.
    // 验证time是否valid
    if (isTimeValid(time)) {
      // 此处调用 compute(time) 方法获得 rdd 实例,并存入 rddOption 变量
      val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
        // Disable checks for existing output directories in jobs launched by the streaming
        // scheduler, since we may need to write output to an existing directory during checkpoint
        // recovery; see SPARK-4835 for more details. We need to have this call here because
        // compute() might cause Spark jobs to be launched.
        // 这个函数在RDD的代码里面,看了一下不是很理解,只能通过注释知道大概意思是不检查输出目录。
        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
          compute(time)
        }
      }

      rddOption.foreach { case newRDD =>
        // Register the generated RDD for caching and checkpointing
        if (storageLevel != StorageLevel.NONE) {
          newRDD.persist(storageLevel)
          logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
        }
        if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
          newRDD.checkpoint()
          logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
        }
        // 将刚刚实例化出来的 rddOption 放入 generatedRDDs 对应的 time 位置
        generatedRDDs.put(time, newRDD)
      }
      rddOption
    } else {
      None
    }
  }
}

6. 源码分析:Dstream.map方法

代码语言:javascript
复制
/** Return a new DStream by applying a function to all elements of this DStream. */
  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))
  }

此处值得说明一下,看compute函数parent.getOrCompute(validTime).map(_.map[U](mapFunc)),在这里同样调用了Dstream的getOrCompute函数,由于validTime已经存在,因此不重新生成RDD,而是从generatedRDDs中取出来。

然后再执行.map(_.map[U](mapFunc))这部分。

代码语言:javascript
复制
class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

7. 源码分析:reduceByKey方法

有了看RDD源码的经验,我们很容易找到reduceByKey是在PairDStreamFunctions类中的。下面看一下它的源码。

Return a new DStream by applying reduceByKey to each RDD. The values for each key are merged using the supplied reduce function. org.apache.spark.Partitioner is used to control the partitioning of each RDD.

代码语言:javascript
复制
def reduceByKey(
    reduceFunc: (V, V) => V,
    partitioner: Partitioner): DStream[(K, V)] = ssc.withScope {
  combineByKey((v: V) => v, reduceFunc, reduceFunc, partitioner)
}

Combine elements of each key in DStream’s RDDs using custom functions. This is similar to the combineByKey for RDDs.

此处,我们仿佛看到了套路,感觉和RDD的设计何其的一致。

这里来了一个ShuffledDStream,具体的Shuffle过程可能会有一点小复杂,暂时不讲,关于shuffle的内容需要再详细地理解一下。

代码语言:javascript
复制
def combineByKey[C: ClassTag](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiner: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
  val cleanedCreateCombiner = sparkContext.clean(createCombiner)
  val cleanedMergeValue = sparkContext.clean(mergeValue)
  val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)
  new ShuffledDStream[K, V, C](
    self,
    cleanedCreateCombiner,
    cleanedMergeValue,
    cleanedMergeCombiner,
    partitioner,
    mapSideCombine)
}

8. 源码分析:DStream.print方法

最后的打印函数也有点意思,它调用的时Dstream的print函数。

firstNum.take(num).foreach(println)这一句,打印出了rdd的内容。

代码语言:javascript
复制
 */
def print(num: Int): Unit = ssc.withScope {
  def foreachFunc: (RDD[T], Time) => Unit = {
    (rdd: RDD[T], time: Time) => {
      val firstNum = rdd.take(num + 1)
      // scalastyle:off println
      println("-------------------------------------------")
      println(s"Time: $time")
      println("-------------------------------------------")
      firstNum.take(num).foreach(println)
      if (firstNum.length > num) println("...")
      println()
      // scalastyle:on println
    }
  }
  foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}

然后呢?

我又发现了一个新的Dstream:ForEachDStream。按照注释来讲,上面的print的操作应该生成的时一个ForEachDStream不过,没找到代码。只能暂时搁置。

An internal DStream used to represent output operations like DStream.foreachRDD.

代码语言:javascript
复制
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

0x03 总结

至此,分析完了Dstream的相关源码,这篇和RDD那篇相对来讲都比较基础,主要是对整个流程的梳理,后续会对一些细节的点进行分析。

参考

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-05-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0x00 前言
    • 文章结构
    • 0x01 概念
      • 什么是Spark Streaming
        • Dstream
          • Spark Streaming和其它实时处理程序的区别
          • 0x02 源码分析
            • Dstream
              • 举个栗子
                • Dstream依赖关系
                  • 1. 源码分析:StreamingContext类
                    • 2. 源码分析:SocketInputDStream类
                      • 3. 源码分析:ReceiverInputDStream类
                        • 4. 源码分析:InputDStream类
                          • 5. 源码分析:Dstream.flatMap方法(以及Dstream如何生成RDD)
                            • 6. 源码分析:Dstream.map方法
                              • 7. 源码分析:reduceByKey方法
                                • 8. 源码分析:DStream.print方法
                                • 0x03 总结
                                • 参考
                                相关产品与服务
                                容器服务
                                腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                                领券
                                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档