学习
实践
活动
专区
工具
TVP
写文章
专栏首页暴走大数据Spark Core源码精读计划6 | AsyncEventQueue与LiveListenerBus

Spark Core源码精读计划6 | AsyncEventQueue与LiveListenerBus

目录

  • 前言
  • 异步事件队列AsyncEventQueue
    • eventQueue、eventCount属性
    • droppedEventsCounter、lastReportTimestamp、logDroppedEvent属性
    • started、stopped属性
    • dispatchThread属性
    • dispatch()方法
    • post()方法
  • 异步事件总线LiveListenerBus
    • queues属性
    • queuedEvents属性
    • addToQueue()方法
    • post()、postToQueues()方法
  • 总结

前言

在上一篇文章中,我们了解了Spark事件总线机制的概况,以及ListenerBus、SparkListenerBus的细节。

由之前的分析可知,SparkListenerBus默认提供的事件投递方法是同步调用的。如果注册的监听器和产生的事件非常多,同步调用必然会造成事件的积压以及处理的延时。因此,在SparkListenerBus的实现类AsyncEventQueue中,提供了异步事件队列机制,它也是SparkContext中的事件总线LiveListenerBus的基础。本文就来研究它们。

异步事件队列AsyncEventQueue

老样子,首先来看类的声明及其内部的属性定义。

代码#6.1 - o.a.s.scheduler.AsyncEventQueue类声明及其属性

private class AsyncEventQueue(
    val name: String,
    conf: SparkConf,
    metrics: LiveListenerBusMetrics,
    bus: LiveListenerBus)
  extends SparkListenerBus
  with Logging {
  import AsyncEventQueue._

  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
    conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))

  private val eventCount = new AtomicLong()

  private val droppedEventsCounter = new AtomicLong(0L)

  @volatile private var lastReportTimestamp = 0L

  private val logDroppedEvent = new AtomicBoolean(false)

  private var sc: SparkContext = null

  private val started = new AtomicBoolean(false)
  private val stopped = new AtomicBoolean(false)

  private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
  private val processingTime = metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")

  private val dispatchThread = new Thread(s"spark-listener-group-$name") {
    setDaemon(true)
    override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
      dispatch()
    }
  }

  // ...
}

该类的构造参数有四个,分别是队列名、Spark配置项、LiveListenerBus的监控度量,以及LiveListenerBus本身。下面来看一下它的主要属性。

eventQueue、eventCount属性

eventQueue是一个存储SparkListenerEvent事件的阻塞队列LinkedBlockingQueue。它的大小是通过配置参数spark.scheduler.listenerbus.eventqueue.capacity来设置的,默认值10000。如果不设置阻塞队列的大小,那么默认值会是Integer.MAX_VALUE,有OOM的风险。

eventCount则是当前待处理事件的计数。因为事件从队列中弹出不代表已经处理完成,所以不能直接用队列的实际大小来表示。它是AtomicLong类型的,以保证修改的原子性。

droppedEventsCounter、lastReportTimestamp、logDroppedEvent属性

droppedEventsCounter是被丢弃事件的计数。当阻塞队列已满后,新产生的事件无法入队,就会被丢弃。日志中定期输出该计数器的值,用lastReportTimestamp记录下每次输出的时间戳,并且输出后都会将计数器重新置为0。

logDroppedEvent用于指示是否发生过了事件丢弃的情况。它与droppedEventsCounter一样也都是原子类型的。

started、stopped属性

这两个属性分别用来标记队列的启动与停止状态。

dispatchThread属性

dispatchThread是将队列中的事件分发到各监听器的守护线程,实际上调用了dispatch()方法。而Utils.tryOrStopSparkContext()方法的作用在于执行代码块时如果抛出异常,就另外起一个线程关闭SparkContext。

下面就来看看dispatch()方法的源码。

dispatch()方法

代码#6.2 - o.a.s.scheduler.AsyncEventQueue.dispatch()方法

  private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
    var next: SparkListenerEvent = eventQueue.take()
    while (next != POISON_PILL) {
      val ctx = processingTime.time()
      try {
        super.postToAll(next)
      } finally {
        ctx.stop()
      }
      eventCount.decrementAndGet()
      next = eventQueue.take()
    }
    eventCount.decrementAndGet()
  }

可见,该方法循环地从事件队列中取出事件,并调用父类ListenerBus特征的postToAll()方法(文章#5已经讲过)将其投递给所有已注册的监听器,并减少计数器的值。“毒药丸”POISON_PILL是伴生对象中定义的一个特殊的空事件,在队列停止(即调用stop()方法)时会被放入,dispatcherThread取得它之后就会“中毒”退出循环。

有了处理事件的方法,还得有将事件放入队列的方法才完整。下面是入队的方法post()。

post()方法

代码#6.3 - o.a.s.scheduler.AsyncEventQueue.post()方法

  def post(event: SparkListenerEvent): Unit = {
    if (stopped.get()) {
      return
    }

    eventCount.incrementAndGet()
    if (eventQueue.offer(event)) {
      return
    }

    eventCount.decrementAndGet()
    droppedEvents.inc()
    droppedEventsCounter.incrementAndGet()
    if (logDroppedEvent.compareAndSet(false, true)) {
      logError(s"Dropping event from queue $name. " +
        "This likely means one of the listeners is too slow and cannot keep up with " +
        "the rate at which tasks are being started by the scheduler.")
    }
    logTrace(s"Dropping event $event")

    val droppedCount = droppedEventsCounter.get
    if (droppedCount > 0) {
      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
        if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
          val prevLastReportTimestamp = lastReportTimestamp
          lastReportTimestamp = System.currentTimeMillis()
          val previous = new java.util.Date(prevLastReportTimestamp)
          logWarning(s"Dropped $droppedCount events from $name since $previous.")
        }
      }
    }
  }

该方法首先检查队列是否已经停止。如果是运行状态,就试图将事件event入队。若offer()方法返回false,表示队列已满,将丢弃事件的计数器自增,并标记有事件被丢弃。最后,若当前的时间戳与上一次输出droppedEventsCounter值的间隔大于1分钟,就在日志里输出它的值。

理解了AsyncEventQueue的细节之后,我们就可以进一步来看LiveListenerBus的实现了。

异步事件总线LiveListenerBus

AsyncEventQueue已经继承了SparkListenerBus特征,LiveListenerBus内部用到了AsyncEventQueue作为核心。来看它的声明以及属性的定义。

代码#6.4 - o.a.s.scheduler.LiveListenerBus类声明及其属性

private[spark] class LiveListenerBus(conf: SparkConf) {
  import LiveListenerBus._

  private var sparkContext: SparkContext = _

  private[spark] val metrics = new LiveListenerBusMetrics(conf)

  private val started = new AtomicBoolean(false)
  private val stopped = new AtomicBoolean(false)

  private val droppedEventsCounter = new AtomicLong(0L)

  @volatile private var lastReportTimestamp = 0L

  private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()

  @volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()

  // ...
}

这里的属性与AsyncEventQueue大同小异,多出来的主要是queues与queuedEvents两个。

queues属性

queues维护一个AsyncEventQueue的列表,也就是说LiveListenerBus中会有多个事件队列。它采用CopyOnWriteArrayList来保证线程安全性。

queuedEvents属性

queuedEvents维护一个SparkListenerEvent的列表,它的用途是在LiveListenerBus启动成功之前,缓存可能已经收到的事件。在启动之后,这些缓存的事件会首先投递出去。

LiveListenerBus作为一个事件总线,也必须提供监听器注册、事件投递等功能,这些都是在AsyncEventQueue基础之上实现的,下面来看一看。

addToQueue()方法

代码#6.5 - o.a.s.scheduler.LiveListenerBus.addToQueue()方法

  private[spark] def addToQueue(
      listener: SparkListenerInterface,
      queue: String): Unit = synchronized {
    if (stopped.get()) {
      throw new IllegalStateException("LiveListenerBus is stopped.")
    }

    queues.asScala.find(_.name == queue) match {
      case Some(queue) =>
        queue.addListener(listener)

      case None =>
        val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
        newQueue.addListener(listener)
        if (started.get()) {
          newQueue.start(sparkContext)
        }
        queues.add(newQueue)
    }
  }

该方法将监听器listener注册到名为queue的队列中。它会在queues列表中寻找符合条件的队列,如果该队列已经存在,就调用父类ListenerBus的addListener()方法直接注册监听器。反之,就先创建一个AsyncEventQueue,注册监听器到新的队列中。

LiveListenerBus还提供了另外4种直接注册监听器的方法,分别对应内置的4个队列,其名称在伴生对象中有定义。

代码#6.6 - LiveListenerBus中其他注册监听器的方法

  def addToSharedQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, SHARED_QUEUE)
  }

  def addToManagementQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
  }

  def addToStatusQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, APP_STATUS_QUEUE)
  }

  def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EVENT_LOG_QUEUE)
  }

post()、postToQueues()方法

代码#6.7 - o.a.s.scheduler.LiveListenerBus.post()与postToQueues()方法

  def post(event: SparkListenerEvent): Unit = {
    if (stopped.get()) {
      return
    }
    metrics.numEventsPosted.inc()

    if (queuedEvents == null) {
      postToQueues(event)
      return
    }

    synchronized {
      if (!started.get()) {
        queuedEvents += event
        return
      }
    }

    postToQueues(event)
  }

  private def postToQueues(event: SparkListenerEvent): Unit = {
    val it = queues.iterator()
    while (it.hasNext()) {
      it.next().post(event)
    }
  }

post()方法会检查queuedEvents中有无缓存的事件,以及事件总线是否还没有启动。投递时会调用postToQueues()方法,将事件发送给所有队列,由AsyncEventQueue来完成投递到监听器的工作。

总结

本文研究了与SparkContext相关的异步事件处理机制的实现,即AsyncEventQueue与LiveListenerBus。它们之间的关系可以用下面的简图来表示。

图#6.1 - AsyncEventQueue与LiveListenerBus示意

按照SparkContext初始化的顺序,下面将要介绍的是SparkEnv,内容也相当多,我们采用与研究SparkContext时类似的方式来阅读它的源码。

— THE END —

文章分享自微信公众号:
大数据真好玩

本文参与 腾讯云自媒体分享计划 ,欢迎热爱写作的你一起参与!

原始发表时间:2019-08-02
如有侵权,请联系 cloudcommunity@tencent.com 删除。
登录 后参与评论
0 条评论

相关文章

  • Spark Core源码精读计划18 | 与RDD的重逢

    在前面的17篇文章中,我们对以SparkContext和SparkEnv为中心展开的Spark Core底层支撑组件有了比较深入的理解,当然有一些重要的组件,会...

    大数据真好玩
  • Spark Core源码精读计划21 | Spark Block的基本实现

    前面我们用3篇文章的时间讲解了RDD的基础知识,包括其五要素、算子、依赖、分区以及检查点。实际上,与RDD相关的细节还有很多,渗透在之后的研究过程中。在时机合适...

    大数据真好玩
  • Spark Core源码精读计划11 | Spark广播机制的实现

    在RPC的领域里摸爬滚打了很长时间,是时候抽身出来看一看其他东西了。顺着SparkEnv初始化的思路继续看,下一个主要组件是广播管理器BroadcastMana...

    大数据真好玩
  • Spark Core源码精读计划19 | RDD的依赖与分区逻辑

    按照计划,本文来讲解RDD的依赖与分区器。这两者不仅与之后调度系统的细节(DAG、Shuffle等)息息相关,而且也是面试Spark系大数据研发工程师时经常被问...

    大数据真好玩
  • Spark Core源码精读计划12 | Spark序列化及压缩机制浅析

    《Spark Core源码精读计划3 | SparkContext辅助属性及后初始化》

    大数据真好玩
  • Spark Core源码精读计划5 | 事件总线及ListenerBus

    在讲解SparkContext组件初始化时,第一个初始化的内部组件就是LiveListenerBus,后面的组件很多都会依赖它,这从侧面说明事件总线是非常重要的...

    大数据真好玩
  • Spark Core源码精读计划14 | Spark Web UI界面的实现

    《Spark Core源码精读计划3 | SparkContext辅助属性及后初始化》

    大数据真好玩
  • Spark Core源码精读计划 | SparkContext组件初始化

    SparkContext在整个Spark Core中的地位毋庸置疑,可以说是核心中的核心。它存在于Driver中,是Spark功能的主要入口,如果没有Spark...

    大数据真好玩
  • Spark Core源码精读计划#28:磁盘存储DiskStore

    在上一篇文章中,我们认识了Spark管理磁盘块的组件DiskBlockManager,本文接着来看真正负责磁盘存储的组件DiskStore,以及与它相关的Blo...

    大数据真好玩
  • Spark Core源码精读计划7 | Spark执行环境的初始化

    《Spark Core源码精读计划3 | SparkContext辅助属性及后初始化》

    王知无-import_bigdata
  • Spark Core源码精读计划15 | 心跳接收器HeartbeatReceiver

    按照SparkContext初始化的顺序,下一个应该是心跳接收器HeartbeatReceiver。由于笔者感染乙流仍然没有痊愈,状态不好,文中若有疏漏,请批评...

    大数据真好玩
  • Spark Core源码精读计划13 | 度量系统MetricsSystem的建立

    《Spark Core源码精读计划3 | SparkContext辅助属性及后初始化》

    大数据真好玩
  • Spark Core源码精读计划#29:BlockManager主从及RPC逻辑

    通过前面几篇文章的讲解,我们就把Spark Core存储体系中的内存存储和磁盘存储逻辑基本上讲完了,而负责将这些组件统一管理并发挥作用的就是BlockManag...

    大数据真好玩
  • Spark Core源码精读计划4 | SparkContext提供的其他功能

    前面两篇文章一直在讲SparkContext初始化的内部逻辑,除此之外,它也对外提供一部分其他功能,我们挑选几个主要的来简要了解。SparkContext还有一...

    大数据真好玩
  • Spark Core源码精读计划9 | Spark RPC环境中的消息调度逻辑

    上一篇文章以NettyRpcEnv的概况结尾,对它内部的一些重要组件进行了简要的介绍。比起继续向下深挖,个人感觉现在平行地来搞比较合适,毕竟我们已经来到了相当底...

    大数据真好玩
  • Spark Core源码精读计划26 | 内存存储MemoryStore的具体实现

    前面我们已经对内存池MemoryPool、内存管理器MemoryManager有了比较深入的了解,接下来要介绍的就是MemoryStore,它负责Spark内存...

    大数据真好玩
  • Spark Core源码精读计划27 | 磁盘块管理器DiskBlockManager

    我们前面用4篇文章的时间讲解了Spark存储子系统中的内存部分,其内容相当多,包括内存池MemoryPool、内存管理器MemoryManager(包含两种实现...

    大数据真好玩
  • Spark Core源码精读计划17 | 上下文清理器ContextCleaner

    话休絮烦,本文讲解SparkContext初始化的最后一个组件——ContextCleaner,即上下文清理器。顾名思义,它扮演着Spark Core中垃圾收集...

    大数据真好玩
  • Spark Core源码精读计划16 | 通过ExecutorAllocationManager实现动态Executor分配

    按照SparkContext初始化的顺序,接下来就轮到调度系统的三大金刚——SchedulerBackend、TaskScheduler、DAGSchedule...

    大数据真好玩

扫码关注腾讯云开发者

领取腾讯云代金券