前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Driver进程启动

Driver进程启动

作者头像
solve
发布2019-10-30 13:10:47
8740
发布2019-10-30 13:10:47
举报
文章被收录于专栏:大数据技术栈大数据技术栈

前言

以Standalone 的 cluster 提交方式为例,从源码角度解析 driver 的启动流程

  1. 当我们敲下脚本执行 spark submit指令之后,查看脚本可知,会启动一个 org.apache.spark.deploy.SparkSubmit 提交进程。
  2. main() 方法会进入到 submit() 方法
代码语言:javascript
复制
def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }
  1. submit() 方法主要有以下两个方法:一个是 prepareSubmitEnvironment() 另外一个是 runMain()
代码语言:javascript
复制
private def submit(args: SparkSubmitArguments): Unit = {
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
            }
          })
        } catch {
          case e: Exception =>
            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
            // makes the message printed to the output by the JVM not very helpful. Instead,
            // detect exceptions with empty stack traces here, and treat them differently.
            if (e.getStackTrace().length == 0) {
              // scalastyle:off println
              printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
              // scalastyle:on println
              exitFn(1)
            } else {
              throw e
            }
        }
      } else {
        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
      }
  • prepareSubmitEnvironment():主要就是下面4个方面的作用

Prepare the environment for submitting an application. This returns a 4-tuple: (1) the arguments for the child process, (2) a list of classpath entries for the child, (3) a map of system properties, and (4) the main class for the child

  • 为子进程准备提交的环境,主要包含以下几点 (该子进程我们这里叫 client 进程,但是不是通常我们说的 AppClient) (1)运行所需参数 (2) 运行时 classpath 列表 (3) 系统属性的映射 (4) 运行的入口函数 main

其中有如下一段代码我们可以看到 standaloneCluster 启动方式指定的 client 进程为 org.apache.spark.deploy.Client

代码语言:javascript
复制
    ...
   if (args.isStandaloneCluster) {
      if (args.useRest) {
        childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
        childArgs += (args.primaryResource, args.mainClass)
      } else {
        // In legacy standalone cluster mode, use Client as a wrapper around the user class
        childMainClass = "org.apache.spark.deploy.Client"
        if (args.supervise) { childArgs += "--supervise" }
        Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
        Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
        childArgs += "launch"
        childArgs += (args.master, args.primaryResource, args.mainClass)
      }
      if (args.childArgs != null) {
        childArgs ++= args.childArgs
      }
    }
    ...
  • runMain() :该方法顾名思义就是运行 main 方法,这个 main 方法就是我们上面 client 进程的 main 方法,值得一说的是,这里是通过反射的方法来运行 main 函数,而不是重新启动了一个进程。
  1. 接下来我们就去看看 org.apache.spark.deploy.Client 的 main 方法吧
代码语言:javascript
复制
object Client {
  def main(args: Array[String]) {
    // scalastyle:off println
    if (!sys.props.contains("SPARK_SUBMIT")) {
      println("WARNING: This client is deprecated and will be removed in a future version of Spark")
      println("Use ./bin/spark-submit with \"--master spark://host:port\"")
    }
    // scalastyle:on println

    val conf = new SparkConf()
    val driverArgs = new ClientArguments(args)

    if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
      conf.set("spark.akka.logLifecycleEvents", "true")
    }
    conf.set("spark.rpc.askTimeout", "10")
    conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
    Logger.getRootLogger.setLevel(driverArgs.logLevel)

    val rpcEnv =
      RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

    val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
      map(rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, _, Master.ENDPOINT_NAME))
    rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))

    rpcEnv.awaitTermination()
  }
}
  • 整段代码比较简单,主要做了一些参数设置,其中最主要的是创建了 ClientEndpoint,并传入了masterEndpoints,接下来就都是 client都是通过 ClientEndpoint 与 master 通讯,那么我们就可以去看看ClientEndpoint是怎么样的。
  1. 现在我们看到 ClientEndpointonstart()方法。如下:
代码语言:javascript
复制
override def onStart(): Unit = {
    driverArgs.cmd match {
      case "launch" =>
        // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
        //       truncate filesystem paths similar to what YARN does. For now, we just require
        //       people call `addJar` assuming the jar is in the same directory.
        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

        val classPathConf = "spark.driver.extraClassPath"
        val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val libraryPathConf = "spark.driver.extraLibraryPath"
        val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val extraJavaOptsConf = "spark.driver.extraJavaOptions"
        val extraJavaOpts = sys.props.get(extraJavaOptsConf)
          .map(Utils.splitCommandString).getOrElse(Seq.empty)
        val sparkJavaOpts = Utils.sparkJavaOpts(conf)
        val javaOpts = sparkJavaOpts ++ extraJavaOpts
        val command = new Command(mainClass,
          Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
          sys.env, classPathEntries, libraryPathEntries, javaOpts)

        val driverDescription = new DriverDescription(
          driverArgs.jarUrl,
          driverArgs.memory,
          driverArgs.cores,
          driverArgs.supervise,
          command)
        ayncSendToMasterAndForwardReply[SubmitDriverResponse](
          RequestSubmitDriver(driverDescription))

      case "kill" =>
        val driverId = driverArgs.driverId
        ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
    }
  }
  • 这段代码的几个关键点

  1. val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"指定了我们 driver 进程的启动类
  2. ayncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription)) 这里就是将启动 driver 的各种参数封装到 RequestSubmitDriver提交给 master ,并会异步的传回启动的信息。

本文到这里就结束了,后面的事情,网上的博客比较多,我就不乱哔哔了,写本篇文章主要是笔者被人问及driver启动,但是究其根的说清楚的在网上没有很清楚的说法,当然也可能是我们自己没找到,嘿嘿。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.06.08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档