专栏首页Spark2.4.0Spark2.4.0源码分析之WorldCount 事件循环处理器(三)
原创

Spark2.4.0源码分析之WorldCount 事件循环处理器(三)

Spark2.4.0源码分析之WorldCount 事件循环处理器(三)

更多资源

时序图

主要内容描述

  • 理解DAG事件循环处理器处理事件流程

源码分析

DAGScheduler.submitJob

  • 调用DAGSchedulerEventProcessLoop.post进行JobSubmitted事件提交
 /**
   * Submit an action job to the scheduler.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   *
   * @return a JobWaiter object that can be used to block until the job finishes executing
   *         or can be used to cancel the job.
   *
   * @throws IllegalArgumentException when partitions ids are illegal
   */
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

DAGSchedulerEventProcessLoop.post

  • DAGSchedulerEventProcessLoop继承EventLoopDAGSchedulerEvent
  • DAGSchedulerEventProcessLoop中没有定义post函数,就等于调用EventLoop.post()函数
  /**
   * Put the event into the event queue. The event thread will process it later.
   */
  def post(event: E): Unit = {
    eventQueue.put(event)
  }

EventLoop.start

  • DAGScheduler类的末尾调用eventProcessLoop.start()
  • DAGSchedulerEventProcessLoop中没有定义start()函数
  • 等于调用EventLoop.start()函数,也就是说DAGScheduler进行实例化时,已经调用函数EventLoop.start
  • 调用eventThread.start()函数,触发线程的run()函数
  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }

EventLoop

  • 列表阻塞队列LinkedBlockingDeque,存放事件
  • 实例化后就死循环调用了事件阻塞队列中的事件,取到事件后调用EventLoop.onReceive()函数,该函数没有实现,调用子类,即DAGSchedulerEventProcessLoop.onReceive()函数
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

 // Exposed for testing.
  private[spark] val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

DAGSchedulerEventProcessLoop.onReceive()

  • 调用DAGSchedulerEventProcessLoop.doOnReceive()对不同的事件类型进行匹配,用相应的事件处理方法进行处理
/**
   * The main event loop of the DAG scheduler.
   */
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }

DAGSchedulerEventProcessLoop.doOnReceive()

  • JobSubmitted事件就调用dagScheduler.handleJobSubmitted()函数进行处理
  • 支持如下事件
可以处理多种事件
).JobSubmitted
).MapStageSubmitted
).StageCancelled
).JobCancelled
).JobGroupCancelled
).AllJobsCancelled
).ExecutorAdded
).ExecutorLost
).WorkerRemoved
).BeginEvent
).SpeculativeTaskSubmitted
).GettingResultEvent
).completion: CompletionEvent
).TaskSetFailed
).ResubmitFailedStages
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

    case StageCancelled(stageId, reason) =>
      dagScheduler.handleStageCancellation(stageId, reason)

    case JobCancelled(jobId, reason) =>
      dagScheduler.handleJobCancellation(jobId, reason)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId, reason) =>
      val workerLost = reason match {
        case SlaveLost(_, true) => true
        case _ => false
      }
      dagScheduler.handleExecutorLost(execId, workerLost)

    case WorkerRemoved(workerId, host, message) =>
      dagScheduler.handleWorkerRemoved(workerId, host, message)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case SpeculativeTaskSubmitted(task) =>
      dagScheduler.handleSpeculativeTaskSubmitted(task)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion: CompletionEvent =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }
Spark2.4.0源码分析之WorldCount 事件循环处理器(三).001.jpeg

end

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark2.4.0源码分析之WorldCount 触发作业提交(二)

    thinktothings
  • Spark2.4.0源码分析之WorldCount 任务调度器(七)

    thinktothings
  • Spark2.4.0源码分析之WorldCount Stage划分(DAGScheduler)(四)

    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {

    thinktothings
  • Pandas-5.基本操作

    悠扬前奏
  • python 中文情感分析 Snownlp库的使用

    SnowNLP是一个python写的类库,可以方便的处理中文文本内容,是受到了TextBlob的启发而写的,由于现在大部分的自然语言处理库基本都是针对英文的,于...

    叶庭云
  • 初识 Fuzzing 工具 WinAFL

    本文前两节将简要讨论 fuzzing 的基本理念以及 WinAFL 中所用到的插桩框架 DynamoRIO ,而后我们从源码和工具使用角度带你了解这个适用于 W...

    Seebug漏洞平台
  • 学完 Python ,我需要编个游戏露两手

    作为一名拥有十余年程序设计教学经验的老师,我清楚地记得,当告诉学生们课程任务有俄罗斯方块游戏开发的时候,部分同学在课堂上热切地喊出“好耶”,部分同学鼓起掌来,大...

    CSDN技术头条
  • 增加MATLAB屏幕截图功能

    万木逢春
  • java高级进阶|不卑不亢,做自己

    这次先分析分析BigDecimal的用法,因为之前自己在使用float和double这样的基本数据类型时踩过坑,所以这次写也算是有时间来看下,如何更好的看下Bi...

    码农王同学
  • 用Python写的HangMan游戏

    py3study

扫码关注云+社区

领取腾讯云代金券