前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【源码解读】| LiveListenerBus源码解读

【源码解读】| LiveListenerBus源码解读

作者头像
857技术社区
发布2022-07-07 15:22:25
1.5K0
发布2022-07-07 15:22:25
举报
文章被收录于专栏:857-Bigdata857-Bigdata

导读

代码语言:javascript
复制
/**
 * Asynchronously passes SparkListenerEvents to registered SparkListeners.
 * 异步将SparkListenerEvents传递给已注册的SparkListeners。
 *
 * Until `start()` is called, all posted events are only buffered. Only after this listener bus
 * has started will events be actually propagated to all attached listeners. This listener bus
 * is stopped when `stop()` is called, and it will drop further events after stopping.
 * 在调用“ start()”之前,所有已发布的事件仅被缓冲。
 * 仅在此侦听器总线启动之后,事件才会实际传播到所有连接的侦听器。
 * 当调用`stop()`时,此侦听器总线停止,并且停止后它将丢弃其他事件。
 */

为什么要使用事件监听机制? 设想如果Spark事件通知采用Scala函数调用方式,随着集群规模的增加,会对函数调用的越来越多,最终会受到JVM线程数量的限制而影响监控数据的更新,甚至出现无法提供监控数据给用户。函数调用多数情况是同步调用,这样还会导致线程阻塞,并被长时间占用。 使用事件监听机制的好处是什么? 会将函数调用更换成事件发送或者事件投递,事件的处理是异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,整个系统的并发将会大大的增加。发送的事件会进入缓存,由定时调度取出,分配给监听此事件的监听器对监控数据更新。

队列

异步事件队列

异步事件列队主要由LinkedBlockingQueue[SparkListenerEvent] 构建,默认大小为10000

事件监听线程会不断从LinkedBlockingQueue中获取事件。任何事件都会在LinkedBlockingQueue中存放一段时间,当线程处理完这个事件后,会将其清除。

代码语言:javascript
复制
    // LiveListenerBus.scala
   private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
   // AsyncEventQueue.scala
  // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
  conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))  //默认值10000

监听器队列

代码语言:javascript
复制
  /** Add a listener to queue shared by all non-internal listeners. */
  /**
   * 主要由SparkContext调用,即用户可以在代码中增加Listener,
   * 或从配单中增加Listener并反射调用[实现在SparkContext中的setupAndStartListenerBus()]
   * */
  def addToSharedQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, SHARED_QUEUE)
  }

  /** Add a listener to the executor management queue. */
  /**
   * 分别可增加HeartbeatReceiver(用于监听Executor的Add和Remove,并使用线程定期判断各Executor的心跳时间,超时则Kill
   * Executor),另外可通过ExecutorAllocationManager增加ExecutorAllocationListener
   * (通过计算总task数和Excutor并行度进行匹配,动态增加、减少Executor,需要配置,默认关闭)
   * */
  def addToManagementQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
  }

  /** Add a listener to the application status queue. */
  /**
   * 主要增加了AppStatusListener,为AppStatusStore提供Job、Stage、Task的UI展示数据,
   * 以及增加了SQLAppStatusListener,为SQLAppStatesStore提供SQLUI展示数据
   * */
  def addToStatusQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, APP_STATUS_QUEUE)
  }

  /** Add a listener to the event log queue. */
  /**将监听到的事件以Json方式写出到日志存储,需要配置,默认为关闭*/
  def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EVENT_LOG_QUEUE)
  }

  /**
   * Add a listener to a specific queue, creating a new queue if needed. Queues are independent
   * of each other (each one uses a separate thread for delivering events), allowing slower
   * listeners to be somewhat isolated from others.
   * 前面几个方法内部均调用此方法
   * 另外:spark structured streaming流式计算对应的StreamingQueryListenerBus通过addToQueue()方法加入"streams"队列
   * (用于监听流的start、process、terminate时间,其中process事件能获取到流处理的详细进度,包括流名称、id、水印时间、
   * source offsets、sink offsets等)
   */
  private[spark] def addToQueue(
      listener: SparkListenerInterface,
      queue: String): Unit = synchronized {
    if (stopped.get()) {
      throw new IllegalStateException("LiveListenerBus is stopped.")
    }

    queues.asScala.find(_.name == queue) match {
      case Some(queue) =>
        queue.addListener(listener)

      case None =>
        val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
        newQueue.addListener(listener)
        if (started.get()) {
          newQueue.start(sparkContext)
        }
        queues.add(newQueue)
    }
  }

事件投递

SparkListenerEvent事件类型

SparkListenerEvent 是一个特质,如下是一些子类,可以用于事件的展示、记录。

代码语言:javascript
复制
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
  /* Whether output this event to the event log */
  protected[spark] def logEvent: Boolean = true
}

@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
  extends SparkListenerEvent

@DeveloperApi
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
  extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent

外部公用事件投递接口POST

外部事件投递接口,SparkContext、DAGScheduler 、CoarseGrainedSchedulerBackend等都通过post,提交事件到总线。

投递过程:

  • 总线启动,调用postToQueues()方法将事件投入到对应的命名队列中。
  • 总线未启动,将事件保存到ListBuffer[SparkListenerEvent]队列中,等待总线启动时投递事件,清空缓存

事件投递过程代码如下

代码语言:javascript
复制
// 在SparkContext中会调用事件的start方法启动总线
  def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {
    // 标记总线为已启动
    if (!started.compareAndSet(false, true)) {
      throw new IllegalStateException("LiveListenerBus already started.")
    }

    this.sparkContext = sc
    // 总线启动后,将queuedEvents缓存队列投递后清空
    queues.asScala.foreach { q =>
      q.start(sc)
      queuedEvents.foreach(q.post)
    }
    queuedEvents = null
    metricsSystem.registerSource(metrics)
  }
  
  //在post方法中,会判断总线是否启动及投递
    def post(event: SparkListenerEvent): Unit = {
    if (stopped.get()) {
      return
    }

    metrics.numEventsPosted.inc()

    // If the event buffer is null, it means the bus has been started and we can avoid
    // synchronization and post events directly to the queues. This should be the most
    // common case during the life of the bus.
    // 总线已经启动,缓存队列queuedEvents已置为null,则直接投递
    if (queuedEvents == null) {
      postToQueues(event)
      return
    }

    // Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
    // calling start() picks up the new event.
    synchronized {
      if (!started.get()) {
        // 总线未启动,则将事件先放入缓存队列
        queuedEvents += event
        return
      }
    }

    // If the bus was already started when the check above was made, just post directly to the queues.
    // 投递事件
    postToQueues(event)
  }

DAGScheduler投递事件分析

更新监控指标

代码语言:javascript
复制
def executorHeartbeatReceived(
      execId: String,
      // (taskId, stageId, stageAttemptId, accumUpdates)
      accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
      blockManagerId: BlockManagerId): Boolean = {
    listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
    blockManagerMaster.driverEndpoint.askSync[Boolean](
      BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
  }

Task执行启动及获取Result

代码语言:javascript
复制
  private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
    // Note that there is a chance that this task is launched after the stage is cancelled.
    // In that case, we wouldn't have the stage anymore in stageIdToStage.
    val stageAttemptId =
      stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
    listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
  }
  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
    listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId))
  }
  private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
    listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
  }
  .........................................

Stage的启动停止

代码语言:javascript
复制
  /** Called when stage's parents are available and we can now do its task.
   * 在stages父类有空闲的时候,就可以去执行task
   * */
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")

    // First figure out the indexes of partition ids to compute.
    //1. 当前Stage没有计算完的分区对应的索引
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
    // with this Stage
    //2. 关联ActiveJob中的调度池,作业组,描述等
    val properties = jobIdToActiveJob(jobId).properties
    //3. 将当前stage加入runningStages集合
    runningStages += stage
    // SparkListenerStageSubmitted should be posted before testing whether tasks are
    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    // event.
    //4. 根据Stage类别,计算分区位置
    stage match {
      case s: ShuffleMapStage =>
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
      case s: ResultStage =>
        outputCommitCoordinator.stageStart(
          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
    }
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

Job启动停止

代码语言:javascript
复制
private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],                         //
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) //1.创建最终FinalStage(ResultStage)
    } catch {
      case e: BarrierJobSlotsNumberCheckFailed =>
        logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
          "than the total number of slots in the cluster currently.")
        // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
        val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
          new BiFunction[Int, Int, Int] {
            override def apply(key: Int, value: Int): Int = value + 1
          })
        if (numCheckFailures <= maxFailureNumTasksCheck) {
          messageScheduler.schedule(
            new Runnable {
              override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
                partitions, callSite, listener, properties))
            },
            timeIntervalNumTasksCheck,
            TimeUnit.SECONDS
          )
          return
        } else {
          // Job failed, clear internal data.....
          ...
          
    private[scheduler] def cleanUpAfterSchedulerStop() {
    for (job <- activeJobs) {
      val error =
        new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down")
      job.listener.jobFailed(error)
      // Tell the listeners that all of the running stages have ended.  Don't bother
      // cancelling the stages because if the DAG scheduler is stopped, the entire application
      // is in the process of getting stopped.
      val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
      // The `toArray` here is necessary so that we don't iterate over `runningStages` while
      // mutating it.
      runningStages.toArray.foreach { stage =>
        markStageAsFinished(stage, Some(stageFailedMessage))
      }
      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
    }
  }

AsyncEventQueue 异步事件处理

AsyncEventQueue类图继承结构

AsyncEventQueue方法列表

AsyncEventQueue 功能点

  • dispatchThread AsyncEventQueue内部具有一个单一线程的dispatchThread,调用dispatch()–>postToAll()–>doPostEvent()方法持续处理eventQueue中事件,让所有注册的listener响应事件

AsyncEventQueue 父类doPostEvent方法实现

StreamingListenerBus及StreamingQueryListenerBus重写了doPostEvent(),只关注和处理流相关的事件。

从方法中看出,除了事件匹配还用了SparkListenerInterface

代码语言:javascript
复制
protected override def doPostEvent(
      listener: SparkListenerInterface,
      event: SparkListenerEvent): Unit = {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)
      case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)
      case jobStart: SparkListenerJobStart =>
        listener.onJobStart(jobStart)
      case jobEnd: SparkListenerJobEnd =>
        listener.onJobEnd(jobEnd)
      case taskStart: SparkListenerTaskStart =>
        listener.onTaskStart(taskStart)
      case taskGettingResult: SparkListenerTaskGettingResult =>
        listener.onTaskGettingResult(taskGettingResult)
      case taskEnd: SparkListenerTaskEnd =>
        listener.onTaskEnd(taskEnd)
      case environmentUpdate: SparkListenerEnvironmentUpdate =>
        listener.onEnvironmentUpdate(environmentUpdate)
      case blockManagerAdded: SparkListenerBlockManagerAdded =>
        listener.onBlockManagerAdded(blockManagerAdded)
      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        listener.onBlockManagerRemoved(blockManagerRemoved)
      case unpersistRDD: SparkListenerUnpersistRDD =>
        listener.onUnpersistRDD(unpersistRDD)
      case applicationStart: SparkListenerApplicationStart =>
        listener.onApplicationStart(applicationStart)
      case applicationEnd: SparkListenerApplicationEnd =>
        listener.onApplicationEnd(applicationEnd)
      case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
        listener.onExecutorMetricsUpdate(metricsUpdate)
      case executorAdded: SparkListenerExecutorAdded =>
        listener.onExecutorAdded(executorAdded)
      case executorRemoved: SparkListenerExecutorRemoved =>
        listener.onExecutorRemoved(executorRemoved)
      case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>
        listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)
      case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>
        listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)
      case executorBlacklisted: SparkListenerExecutorBlacklisted =>
        listener.onExecutorBlacklisted(executorBlacklisted)
      case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
        listener.onExecutorUnblacklisted(executorUnblacklisted)
      case nodeBlacklisted: SparkListenerNodeBlacklisted =>
        listener.onNodeBlacklisted(nodeBlacklisted)
      case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
        listener.onNodeUnblacklisted(nodeUnblacklisted)
      case blockUpdated: SparkListenerBlockUpdated =>
        listener.onBlockUpdated(blockUpdated)
      case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
        listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
      case _ => listener.onOtherEvent(event)
    }
  }

AsyncEventQueue事件处理流程

SparkListenerInterface分析

Streaming 后续会详细分析

AppStatusListener

Spark UI中Job、Stage、Task页面,调用AppStatusStore提供的方法,读取kvstore中存储的rdd任务相关信息。

代码语言:javascript
复制
**
 * A Spark listener that writes application information to a data store. The types written to the
 * store are defined in the `storeTypes.scala` file and are based on the public REST API.
 * Spark监听器,将应用程序信息写入数据存储。写入的类型
 * store定义在' storeTypes中。scala '文件,并且基于公共REST API。
 * @param lastUpdateTime When replaying logs, the log's last update time, so that the duration of
 *                       unfinished tasks can be more accurately calculated (see SPARK-21922).
 */
private[spark] class AppStatusListener(
    kvstore: ElementTrackingStore,
    conf: SparkConf,
    live: Boolean,
    lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {

SQLAppStatusListener

Spark UI中SQL页面,调用SQLAppStatusStore提供的方法,读取kvstore中存储的SparkPlan物理计划(SQL真实执行流程)相关信息。

代码语言:javascript
复制
class SQLAppStatusListener(
    conf: SparkConf,
    kvstore: ElementTrackingStore,
    live: Boolean) extends SparkListener with Logging {

KVStore后续更新~~

晚安~~

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-06-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 857Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 队列
    • 异步事件队列
      • 监听器队列
      • 事件投递
        • SparkListenerEvent事件类型
          • 外部公用事件投递接口POST
          • DAGScheduler投递事件分析
            • 更新监控指标
              • Task执行启动及获取Result
                • Stage的启动停止
                  • Job启动停止
                  • AsyncEventQueue 异步事件处理
                    • AsyncEventQueue类图继承结构
                      • AsyncEventQueue方法列表
                        • AsyncEventQueue 功能点
                          • AsyncEventQueue 父类doPostEvent方法实现
                            • AsyncEventQueue事件处理流程
                              • SparkListenerInterface分析
                                • AppStatusListener
                                • SQLAppStatusListener
                            相关产品与服务
                            数据保险箱
                            数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档