前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark源码分析 之 Driver和Excutor是怎么跑起来的?(2.2.0版本)

Spark源码分析 之 Driver和Excutor是怎么跑起来的?(2.2.0版本)

作者头像
用户1154259
发布2018-01-17 11:41:24
5760
发布2018-01-17 11:41:24
举报

今天抽空回顾了一下Spark相关的源码,本来想要了解一下Block的管理机制,但是看着看着就回到了SparkContext的创建与使用。正好之前没有正式的整理过这部分的内容,这次就顺带着回顾一下。

更多内容参考:我的大数据之路

Spark作为目前最流行的大数据计算框架,已经发展了几个年头了。版本也从我刚接触的1.6升级到了2.2.1。由于目前工作使用的是2.2.0,所以这次的分析也就从2.2.0版本入手了。

涉及的内容主要有:

  • Standalone模式中的Master与Worker
  • client、driver、excutor的关系

下面就按照顺序依次介绍一下。

Master与Worker

在最开始编程的时候,很少会涉及分布式,因为数据量也不大。后来随着硬件的发展cpu的瓶颈,开始流行多线程编程,基于多线程来加快处理速度;再后来,衍生出了网格计算、CPU与GPU的异构并行计算以及当时流行的mapreduce分布式计算。但是mapreduce由于存储以及计算流程的限制,spark开始流行起来。Spark凭借内存计算、强大的DAG回溯能力,快速的占领并行计算的风口。

那么并行计算肯定是需要分布式集群的,常见的集群管理方式,有Master-Slave模式、P2P模式等等。

比如Mysql的主从复制,就是Master-Slave模式;Elasticsearch的分片管理就是P2P模式。在Spark中有不同的部署方式,但是计算的模式都是Master-Slave模式,只不过Slave换了名字叫做worker而已。集群的部署模式如下所示:

流程就是用户以client的身份向master提交任务,master去worker上面创建执行任务的载体(driver和excutor)。

client、driver、excutor的关系

Master和Worker是服务器的部署角色,程序从执行上,则分成了client、driver、excutor三种角色。按照模式的不同,client和driver可能是同一个。以2.2.0版本的standalone模式来说,他们三个是独立的角色。client用于提交程序,初始化一些环境变量;driver用于生成task并追踪管理task的运行;excutor负责最终task的执行。

源码探索

总的流程可以总结为下面的一张图:

通过查看源码,来看一下

1 SparkContext创建调度器

在创建SparkContext的时候会创建几个核心的模块:

  1. DAGScheduler 面向job的调度器
  2. TaskScheduler 不同的集群模式,有不同的实现方式,如standalone下的taskschedulerImpl
  3. SchedulerBackend 不同的集群模式下,有不同的实现方式,如standalone下的StandaloneSchedulerBackend.负责向master发起注册
代码语言:javascript
复制
// 创建并启动调度器
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
...
// 启动调度器
_taskScheduler.start()

在createTaskSchduler中,根据master的不同,选择不同的实现方式,主要是在backend的实现上有差异:

代码语言:javascript
复制
master match {
      case "local" =>
        ...

      case LOCAL_N_REGEX(threads) =>
        ...

      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        ...

      case SPARK_REGEX(sparkUrl) =>
        // 创建调度器
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        // 创建backend
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        // 把backend注入到schduler中
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        ...

      case masterUrl =>
        ...
    }

我们这里只看一下standalone模式的创建,就是创建了TaskSchedulerImpl和StandaloneSchedulerBackend的对象,另外初始化了调度器,根据配置选择调度模式,默认是FIFO:

代码语言:javascript
复制
def initialize(backend: SchedulerBackend) {
    this.backend = backend
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

2 TaskSchedulerImpl执行start方法

其实是执行了backend的start()方法

代码语言:javascript
复制
override def start() {
    backend.start()
    ...
  }

3 StandaloneSchedulerBackend执行start方法

这部分代码比较多,可以简化的看:

  • 封装command对象
  • 封装appDesc对象
  • 创建StandaloneAppClient对象
  • 执行start()方法

其中command中包含的那个类,就是excutor的实现类。

代码语言:javascript
复制
override def start() {
    //初始化参数
    ...
    
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
   ...
    val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
    // 注意前面创建了一大堆的配置对象,主要就是那个class等信息
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    ...
  }

4 发起注册

核心的代码在StanaloneAppClient中,并在start()方法中启动了一个rpc的服务——ClientEndpoint

代码语言:javascript
复制
override def onStart(): Unit = {
  try {
    registerWithMaster(1)//发起注册
  } catch {
    ...
  }
}

registerWithMaster采用了异步发送请求连接master,只要有一个注册成功,其他的都会cancel。这里有时间可以做个小hello world玩玩看。

代码语言:javascript
复制
private def registerWithMaster(nthRetry: Int) {
  registerMasterFutures.set(tryRegisterAllMasters())
  registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
    override def run(): Unit = {
      if (registered.get) {
        registerMasterFutures.get.foreach(_.cancel(true))
        registerMasterThreadPool.shutdownNow()
      } else if (nthRetry >= REGISTRATION_RETRIES) {
        markDead("All masters are unresponsive! Giving up.")
      } else {
        registerMasterFutures.get.foreach(_.cancel(true))
        registerWithMaster(nthRetry + 1)
      }
    }
  }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
//发起注册
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  ...
  masterRef.send(RegisterApplication(appDescription, self))
  ...
}

5 Master接收到请求执行schedule方法

Master是一个常驻的进程,时刻监听别人发过来的消息。刚才client发送了一个RegisterApplication消息,忽略前面创建app的内容,直接执行了schedule方法:

代码语言:javascript
复制
case RegisterApplication(description, driver) =>
   // TODO Prevent repeated registrations from some driver
   if (state == RecoveryState.STANDBY) {
     // ignore, don't send response
   } else {
     ...
     schedule()
   }

6 Master发送launchDriver

发送lanunchDriver请求

代码语言:javascript
复制
private def schedule(): Unit = {
  ...
  for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
    ...
    while (numWorkersVisited < numWorkersAlive && !launched) {
      ...
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
        launchDriver(worker, driver)
        ...
      }
      ...
    }
  }
  startExecutorsOnWorkers()
}
//向worker发送launchDriver请求
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  ...
  worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
  ...
}

7 Worker创建DriverRunner

代码语言:javascript
复制
case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
      drivers(driverId) = driver
      driver.start()

      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem

8 Master发送launchExcutor

第6步中最后有一个startExecutorsOnWorkers方法。

代码语言:javascript
复制
private def startExecutorsOnWorkers(): Unit = {
...
  for (app <- waitingApps if app.coresLeft > 0) {
    ...
    for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
      allocateWorkerResourceToExecutors(
        app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
    }
  }
}

private def allocateWorkerResourceToExecutors(
   app: ApplicationInfo,
   assignedCores: Int,
   coresPerExecutor: Option[Int],
   worker: WorkerInfo): Unit = {
 ...
 for (i <- 1 to numExecutors) {
   ...
   launchExecutor(worker, exec)
   ...
 }
}

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  ...
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  ...
}

9 Worker创建ExcutorRunner

代码语言:javascript
复制
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
  ...
} else {
  try {
    ...
    val manager = new ExecutorRunner(
      appId,
      execId,
      appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
      cores_,
      memory_,
      self,
      workerId,
      host,
      webUi.boundPort,
      publicAddress,
      sparkHome,
      executorDir,
      workerUri,
      conf,
      appLocalDirs, ExecutorState.RUNNING)
    ...
  } catch {
...
  }
}

至此,Driver和Excutor就启动起来了.....

之后代码是怎么运行的,就且听下回分解把!

参考

  1. SparkContext
  2. spark worker解密
  3. 2.2.0源码
  4. 《Spark内核机制及性能调优》· 王家林
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-01-10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Master与Worker
  • client、driver、excutor的关系
  • 源码探索
    • 1 SparkContext创建调度器
      • 2 TaskSchedulerImpl执行start方法
        • 3 StandaloneSchedulerBackend执行start方法
          • 4 发起注册
            • 5 Master接收到请求执行schedule方法
              • 6 Master发送launchDriver
                • 7 Worker创建DriverRunner
                  • 8 Master发送launchExcutor
                    • 9 Worker创建ExcutorRunner
                    • 参考
                    相关产品与服务
                    GPU 云服务器
                    GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档