前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core源码精读计划5 | 事件总线及ListenerBus

Spark Core源码精读计划5 | 事件总线及ListenerBus

作者头像
大数据真好玩
发布2019-08-08 15:50:22
9120
发布2019-08-08 15:50:22
举报
文章被收录于专栏:暴走大数据暴走大数据

目录

  • 前言
  • Spark事件总线概述
  • ListenerBus特征
    • addListener()与removeListener()方法
    • doPostEvent()方法
    • postToAll()方法
  • SparkListenerBus特征
    • SparkListenerInterface与SparkListenerEvent特征
  • 总结

前言

在讲解SparkContext组件初始化时,第一个初始化的内部组件就是LiveListenerBus,后面的组件很多都会依赖它,这从侧面说明事件总线是非常重要的支撑组件。在对SparkContext有了大致的了解之后,我们选择事件总线作为探索Spark底层的起点。

Spark事件总线概述

Spark中的事件总线采用监听器模式设计,其大致流程可以用下面的简图表示。

图#5.1 - 监听器事件总线

ListenerBus特征是Spark内所有事件总线实现的基类,下图示出ListenerBus的一部分继承结构。

图#5.2 - Spark事件总线的类结构

StreamingListenerBus与StreamingQueryListenerBus分别是Spark Streaming与Spark SQL中的组件,我们在这里不考虑它们。

本文先来看ListenerBus特征,以及Spark Core中用到的SparkListenerBus相关细节。下一篇文章则会详细分析AsyncEventQueue与LiveListenerBus。

ListenerBus特征

ListenerBus特征带有两个泛型参数L和E。L代表监听器的类型,并且它可以是任意类型的。E则代表事件的类型。

代码#5.1 - o.a.s.util.ListenerBus特征的声明和属性

代码语言:javascript
复制
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]

  private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava

  //【以下都是方法】
}
代码语言:javascript
复制

属性listenersPlusTimers维护了所有注册在事件总线上的监听器以及它们对应计时器的二元组。计时器是可选的,用来指示监听器处理事件的时间。它采用了并发容器CopyOnWriteArrayList(前一篇文章简单提到过哦),以保证线程安全和支持并发修改。属性listeners就是将listenersPlusTimers中的监听器单独取出来,转化成java.util.List[L]类型。

ListenerBus特征中定义了一些基本的与事件总线相关的方法,如下。

addListener()与removeListener()方法
代码语言:javascript
复制
代码#5.2 - o.a.s.util.ListenerBus.addListener()与removeListener()方法  final def addListener(listener: L): Unit = {
    listenersPlusTimers.add((listener, getTimer(listener)))
  }

  final def removeListener(listener: L): Unit = {
    listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
      listenersPlusTimers.remove(listenerAndTimer)
    }
  }

顾名思义,这两个方法分别向事件总线中注册监听器与移除监听器。它们都是直接在CopyOnWriteArrayList上操作,因此是线程安全的。

doPostEvent()方法

代码#5.3 - o.a.s.util.ListenerBus.doPostEvent()方法

代码语言:javascript
复制
protected def doPostEvent(listener: L, event: E): Unit
代码语言:javascript
复制

这个方法将事件event投递给监听器listener进行处理。在ListenerBus中只提供了定义,具体逻辑须要由各个实现类来提供。

postToAll()方法

代码#5.4 - o.a.s.util.ListenerBus.postToAll()方法

代码语言:javascript
复制
  def postToAll(event: E): Unit = {    val iter = listenersPlusTimers.iterator    while (iter.hasNext) {      val listenerAndMaybeTimer = iter.next()      val listener = listenerAndMaybeTimer._1      val maybeTimer = listenerAndMaybeTimer._2      val maybeTimerContext = if (maybeTimer.isDefined) {        maybeTimer.get.time()      } else {        null      }      try {        doPostEvent(listener, event)        if (Thread.interrupted()) {          throw new InterruptedException()        }      } catch {        case ie: InterruptedException =>          logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}.  " +            s"Removing that listener.", ie)          removeListenerOnError(listener)        case NonFatal(e) =>          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)      } finally {        if (maybeTimerContext != null) {          maybeTimerContext.stop()        }      }    }  }
代码语言:javascript
复制

这个方法通过调用doPostEvent()方法,将事件event投递给所有已注册的监听器。需要注意它是线程不安全的,因此调用方需要保证同时只有一个线程调用它。

SparkListenerBus特征

SparkListenerBus特征是Spark Core内部事件总线的基类,其代码如下。

代码#5.5 - o.a.s.scheduler.SparkListenerBus特征

代码语言:javascript
复制
代码语言:javascript
复制
private[spark] trait SparkListenerBus  extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {  protected override def doPostEvent(      listener: SparkListenerInterface,      event: SparkListenerEvent): Unit = {    event match {      case stageSubmitted: SparkListenerStageSubmitted =>        listener.onStageSubmitted(stageSubmitted)      case stageCompleted: SparkListenerStageCompleted =>        listener.onStageCompleted(stageCompleted)      case jobStart: SparkListenerJobStart =>        listener.onJobStart(jobStart)      case jobEnd: SparkListenerJobEnd =>        listener.onJobEnd(jobEnd)      case taskStart: SparkListenerTaskStart =>        listener.onTaskStart(taskStart)      //【中间略去很多其他事件】      case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>        listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)      case _ => listener.onOtherEvent(event)    }  }}
代码语言:javascript
复制

SparkListenerBus继承了ListenerBus,实现了doPostEvent()方法,对事件进行匹配,并调用监听器的处理方法。如果无法匹配到事件,则调用onOtherEvent()方法。

SparkListenerBus支持的监听器都是SparkListenerInterface的子类,事件则是SparkListenerEvent的子类。下面来了解一下。

SparkListenerInterface与SparkListenerEvent特征

在SparkListenerInterface特征中,分别定义了处理每一个事件的处理方法,统一命名为“on+事件名称”,代码很简单,就不再贴出来了。

SparkListenerEvent是一个没有抽象方法的特征,类似于Java中的标记接口(marker interface),它唯一的用途就是标记具体的事件类。事件类统一命名为“SparkListener+事件名称”,并且都是Scala样例类。我们可以简单看一下它们的部分代码。

代码#5.6 - o.a.s.scheduler.SparkListenerEvent特征及其部分子类

代码语言:javascript
复制
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
  protected[spark] def logEvent: Boolean = true
}

@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
  extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskEnd(
    stageId: Int,
    stageAttemptId: Int,
    taskType: String,
    reason: TaskEndReason,
    taskInfo: TaskInfo,
    @Nullable taskMetrics: TaskMetrics)
  extends SparkListenerEvent

@DeveloperApi
case class SparkListenerJobStart(
    jobId: Int,
    time: Long,
    stageInfos: Seq[StageInfo],
    properties: Properties = null)
  extends SparkListenerEvent {
  val stageIds: Seq[Int] = stageInfos.map(_.stageId)
}
// ...
代码语言:javascript
复制

总结

本文介绍了Spark事件总线机制的概况,并通过阅读ListenerBus与SparkListenerBus相关的源码,对Spark Core事件总线的规范有了初步的了解。下一篇文章会重点分析SparkListenerBus的实现类AsyncEventQueue,以及利用它的LiveListenerBus,从而深入理解事件总线的设计细节。

— THE END —

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Spark事件总线概述
  • ListenerBus特征
    • addListener()与removeListener()方法
      • doPostEvent()方法
        • postToAll()方法
        • SparkListenerBus特征
          • SparkListenerInterface与SparkListenerEvent特征
          • 总结
          相关产品与服务
          事件总线
          腾讯云事件总线(EventBridge)是一款安全,稳定,高效的云上事件连接器,作为流数据和事件的自动收集、处理、分发管道,通过可视化的配置,实现事件源(例如:Kafka,审计,数据库等)和目标对象(例如:CLS,SCF等)的快速连接,当前 EventBridge 已接入 100+ 云上服务,助力分布式事件驱动架构的快速构建。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档