前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark源码 —— 从 SparkSubmit 到 Driver启动

Spark源码 —— 从 SparkSubmit 到 Driver启动

作者头像
solve
发布2020-01-15 17:05:16
7760
发布2020-01-15 17:05:16
举报
文章被收录于专栏:大数据技术栈大数据技术栈

前言

本文主要是以笔记的整理方式写的, 仅以分享的方式供你阅读, 如有不对的地方欢迎指点错误。 读完本文可以学到: 当你用 shell 命令执行 spark-submit 之后, 到你的代码开始正式运行的一些列知识和细节, 恩...粗略的,要看的更细,可以按照流程自己撸源码哈~~~~

SparkSubmit

  • Spark-Submit脚本执行后, 会执行到org.apache.spark.deploy.SparkSubmit 所以我们从SparkSubmit 类开始, 以下是org.apache.spark.deploy.SparkSubmit简单的时序图

image.png

  • main方法:
    1. 解析我们传入的参数
    2. 根据 action 执行相对应的功能 当然这里我们的 Action 是:SparkSubmitAction.SUBMIT
代码语言:javascript
复制
  def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }
  • prepareSubmitEnvironment: 该方法主要是进行四个参数的解析:
代码语言:javascript
复制
···
private[deploy] def prepareSubmitEnvironment(args: 
SparkSubmitArguments)    : (Seq[String], Seq[String], Map[String, String], String) = {  // Return values  
val childArgs = new ArrayBuffer[String]() 
val childClasspath = new ArrayBuffer[String]()  
val sysProps = new HashMap[String, String]()  
var childMainClass = ""
 ...
 返回值
  (childArgs, childClasspath, sysProps, childMainClass)
  1. childArgs: 主要就是一些参数的
  2. childClasspath:这个就是classPath,jvm运行的class路径
  3. sysProps:一些系统参数
  4. childMainClass:接下来将要运行的主类
    • 如果是 Client模式,则该类就是我们自己编写的
    • 如果是Cluster 模式,则根据集群的不同返回不同的类: isStandaloneCluster:org.apache.spark.deploy.Client isYarnCluster: org.apache.spark.deploy.yarn.Client
  • runMain
    1. 加载 childClasspath下的 jars
    2. 设置系统参数 sysProps
    3. 运行 mainMethod,并传递参数

    mainMethod.invoke(null, childArgs.toArray)

至此Sumbmit任务完成,接下来我们以 Standalone Client为列, 进行org.apache.spark.deploy.Client相关源码分析

ClientEendpoint

image.png

  • 创建ClientEendpoint,并将Master注册到 ClientEendpoint
  • ClientEendpointonstart 方法被调起, 构建 DriverDescription, 并指定 Drive r的主类是org.apache.spark.deploy.worker.DriverWrapper 向 Master 申请RequestSubmitDriver(driverDescription)
  • Master 端收到请求后构建DriverInfo并加入到队列: waitingDrivers += driver drivers.add(driver)
  • master开始调度schedule(),并回复客户端申请成功的消息
  • 在Master的 schedule() 里面开始准备启动Driver

这里主要是将整条线理清楚了, 没有纠结细节, 如果有兴趣你可以按照这个线自己去看下源码 那么接下来就是启动Driver的过程了

Master调度

注意查看源码里面写的注释, 千万不要略过, 要不然本文就没啥意思了~~~

代码语言:javascript
复制
 private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) { return }
    // 将worker打乱,主要就是为了负载均衡
    val shuffledWorkers = Random.shuffle(workers)
    //筛选存活的 worker
    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
      // 遍历等待启动的Driver
      for (driver <- waitingDrivers) {
        //如果该worker内存和core都满足要求
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
         // 启动Driver
          launchDriver(worker, driver)
          waitingDrivers -= driver
        }
      }
    }
    //实际上还会去启动Executor,
    //但是我们目前不关注这里,略过
    startExecutorsOnWorkers()
  }

我们重点看下 launchDriver做了什么

代码语言:javascript
复制
  private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
   //将driver的信息记录下来
    worker.addDriver(driver)
    // driver现在知道他该在哪个worker启动了
    driver.worker = Some(worker)
    // 向worker节点发送 LaunchDriver
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    // 标记driver已经运行,实际是Driver可能还没启动呢!!!
    driver.state = DriverState.RUNNING
  }

launchDriver主要就是给 Worker 发送了启动 Driver 的消息 接下来就可以看看 Worker 端是怎么处理 LaunchDriver 这个消息的了。

Wroker调度

代码语言:javascript
复制
//节选代码,不要介意是 case 开头
 case LaunchDriver(driverId, driverDesc) => {
      logInfo(s"Asked to launch driver $driverId")
      //new一个 DriverRunner 
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
        
      //DriverRunner start
      drivers(driverId) = driver
      driver.start()
      //记录下消耗的资源
      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
    }

重点看到 DriverRunner 的 Start 方法

代码语言:javascript
复制
new Thread("DriverRunner for " + driverId) {
    override def run(){
        ...
       //创建driver工作目录
       val driverDir = createWorkingDirectory()
       //下载一些jars
       val localJarFilename = downloadUserJar(driverDir)
        ...
        // 通过系统的指令创建 jvm进程,至此正式启动
        launchDriver(builder, driverDir, driverDesc.supervise)
    }
}.start()

driver的启动是通过一个 DriverRunner类开启一个线程异步启动的, 其过程没有什么特殊的地方, 至此 Driver 正式启动完成了。 接下来就是分析 Driver 主类的启动了 org.apache.spark.deploy.worker.DriverWrapper 而实际上,该类主要的作用就是会:

代码语言:javascript
复制
// Delegate to supplied main class
val clazz = Utils.classForName(mainClass)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])

调起我们自己写的主类方法, 至此,从我们敲下Spark-Submit之后, 终于执行到我们自己所写的代码了。

结言

Spark这部分源码流程比较简单清楚, 基本没有太多弯弯道道, 但是就算简单,那也是需要你自己去琢磨去看的, 否则你还是不能清楚的知道, 你的那个 spark-submit 敲下之后, 怎么就执行到你的代码了呢? OK,就到这里了, 如果没有意外, 本人应该会继续更新一系列的Spark源码文章, 如果你有兴趣,不妨关注一下,

最后,求赞 ~~~~

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • SparkSubmit
  • ClientEendpoint
  • Master调度
  • Wroker调度
  • 结言
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档