前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark源码分析————submit

spark源码分析————submit

作者头像
俺也想起舞
发布2019-07-24 14:37:09
1.2K0
发布2019-07-24 14:37:09
举报

spark-submit提交任务的全过程

下图大致描述了整个过程

spark-submit

代码语言:javascript
复制
if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# 调用bin目录中的spark-class 参数为org.apache.spark.deploy.SparkSubmit
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

会先经历spark-class org.apache.spark.deploy.SparkSubmit处理,里面包括一些基本环境配置,然后运行

再看看 org.apache.spark.deploy.SparkSubmit的main函数

代码语言:javascript
复制
  def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    // 在这里进行操作的匹配,在这里我们肯定是进入submit()
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

submit()

代码语言:javascript
复制
private def submit(args: SparkSubmitArguments): Unit = {
    //先是初始化环境,包括建立合适的环境变量,系统配置,应用参数
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
    .....
}

prepareSubmitEnvironment(args)

代码语言:javascript
复制
/**
   * Prepare the environment for submitting an application.
   * This returns a 4-tuple:
   *   (1) the arguments for the child process,
   *   (2) a list of classpath entries for the child,
   *   (3) a map of system properties, and
   *   (4) the main class for the child
   * Exposed for testing.
   * 这些都是spark-submit 里面的一些配置,例如启动模式、环境变量。这个方法就是根据这些启动模式来做	 * 出对应的处理,由于一般情况下会在yarn进行任务运行,所以这次运行的模式--master yarn --deploy-	 * mode cluster 
   */
  private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
      : (Seq[String], Seq[String], Map[String, String], String) = {
          ......
         //前面都是一些判断,直接进入正题
     if (isYarnCluster) {
       //当为yarn 的cluster模式时  会调用org.apache.spark.deploy.yarn.Client类
      childMainClass = "org.apache.spark.deploy.yarn.Client"
      //是否使用Python
      if (args.isPython) {
        childArgs += ("--primary-py-file", args.primaryResource)
        if (args.pyFiles != null) {
          childArgs += ("--py-files", args.pyFiles)
        }
        childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
         //或者使用R
      } else if (args.isR) {
        val mainFile = new Path(args.primaryResource).getName
        childArgs += ("--primary-r-file", mainFile)
        childArgs += ("--class", "org.apache.spark.deploy.RRunner")
      } else {
          //最后是默认情况,也就是我们这次任务执行的模式
        if (args.primaryResource != SPARK_INTERNAL) {
          childArgs += ("--jar", args.primaryResource)
        }
        childArgs += ("--class", args.mainClass)
      }
      if (args.childArgs != null) {
        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
      }
    }
      //最后通过筛选,返回这些参数
  	 (childArgs, childClasspath, sysProps, childMainClass)
 }

再回到submit(),会执行runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose),也就是传入我们刚刚返回的参数

代码语言:javascript
复制
private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sysProps: Map[String, String],
      childMainClass: String,
      verbose: Boolean): Unit = {
    //前面会根据传入的参数进行环境配置,参数导入以及日志的打印
    ......
    try {
      // 加载我们传入的类,也就是 org.apache.spark.deploy.yarn.Client
      mainClass = Utils.classForName(childMainClass)
    } catch {
      case e: ClassNotFoundException =>
        e.printStackTrace(printStream)
        .......//捕捉异常信息,这里就忽略了
    }
    //获取传入类的main函数
    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
	......
    try {
      // 调用main函数
      mainMethod.invoke(null, childArgs.toArray)
    }
}

然后来看看org.apache.spark.deploy.yarn.Client.main()

代码语言:javascript
复制
  def main(argStrings: Array[String]) {
    if (!sys.props.contains("SPARK_SUBMIT")) {
      logWarning("WARNING: This client is deprecated and will be removed in a " +
        "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
    }
    // Set an env variable indicating we are running in YARN mode.
    // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
    System.setProperty("SPARK_YARN_MODE", "true")
    val sparkConf = new SparkConf

    val args = new ClientArguments(argStrings, sparkConf)
    // to maintain backwards-compatibility
    if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
      sparkConf.setIfMissing("spark.executor.instances", args.numExecutors.toString)
    }
    //在一顿初始化与判断后,初始化自己然后调用run()方法
    new Client(args, sparkConf).run()
  }

run()

代码语言:javascript
复制
def run(): Unit = {
    //提交应用
    this.appId = submitApplication()
    if (!launcherBackend.isConnected() && fireAndForget) {
      ......
    } else {
      val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
      ......
    }
  }

submitApplication(),来看看提交应用里面做了什么

代码语言:javascript
复制
  def submitApplication(): ApplicationId = {
    var appId: ApplicationId = null
    try {
      //launcherBackend在前面已经进行了初始化,launcherBackend是一个底层使用Socket用来传递信息的抽象类
      launcherBackend.connect()
      // Setup the credentials before doing anything else,
      // so we have don't have issues at any point.
      setupCredentials()
      yarnClient.init(yarnConf)
      yarnClient.start()

      logInfo("Requesting a new application from cluster with %d NodeManagers"
        .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

      // Get a new application from our RM
      // 通过yarn api 创建一个application
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      appId = newAppResponse.getApplicationId()
      reportLauncherState(SparkAppHandle.State.SUBMITTED)
      launcherBackend.setAppId(appId.toString())

      // Verify whether the cluster has enough resources for our AM
      // 检测集群是否有足够的资源可以调用
      verifyClusterResources(newAppResponse)
	  // 初始化环境用于启动ApplicationManager
      // Set up the appropriate contexts to launch our AM
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)
	  //最后提交应用,并返回appId
      // Finally, submit and monitor the application
      logInfo(s"Submitting application ${appId.getId} to ResourceManager")
      yarnClient.submitApplication(appContext)
      appId
    } catch {
      case e: Throwable =>
        if (appId != null) {
          cleanupStagingDir(appId)
        }
        throw e
    }
  }

提交完任务后在回到 run() ,通过代码可以看出,他会继续执行monitorApplication(appId),也就是监视任务的进行

代码语言:javascript
复制
/**
   * Report the state of an application until it has exited, either successfully or
   * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
   * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,
   * or KILLED).
   *
   * @param appId ID of the application to monitor.
   * @param returnOnRunning Whether to also return the application state when it is RUNNING.
   * @param logApplicationReport Whether to log details of the application report every iteration.
   * @return A pair of the yarn application state and the final application state.
   */
  def monitorApplication(
      appId: ApplicationId,
      returnOnRunning: Boolean = false,
      logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
    var lastState: YarnApplicationState = null
    while (true) {
      Thread.sleep(interval)
      val report: ApplicationReport =
        try {
          getApplicationReport(appId)
        } catch {
          case e: ApplicationNotFoundException =>
            logError(s"Application $appId not found.")
            return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
          case NonFatal(e) =>
            logError(s"Failed to contact YARN for application $appId.", e)
            return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED)
        }
      val state = report.getYarnApplicationState

      if (logApplicationReport) {
        logInfo(s"Application report for $appId (state: $state)")

        // If DEBUG is enabled, log report details every iteration
        // Otherwise, log them every time the application changes state
        if (log.isDebugEnabled) {
          logDebug(formatReportDetails(report))
        } else if (lastState != state) {
          logInfo(formatReportDetails(report))
        }
      }

      if (lastState != state) {
        state match {
          case YarnApplicationState.RUNNING =>
            reportLauncherState(SparkAppHandle.State.RUNNING)
          case YarnApplicationState.FINISHED =>
            reportLauncherState(SparkAppHandle.State.FINISHED)
          case YarnApplicationState.FAILED =>
            reportLauncherState(SparkAppHandle.State.FAILED)
          case YarnApplicationState.KILLED =>
            reportLauncherState(SparkAppHandle.State.KILLED)
          case _ =>
        }
      }

      if (state == YarnApplicationState.FINISHED ||
        state == YarnApplicationState.FAILED ||
        state == YarnApplicationState.KILLED) {
        cleanupStagingDir(appId)
        return (state, report.getFinalApplicationStatus)
      }

      if (returnOnRunning && state == YarnApplicationState.RUNNING) {
        return (state, report.getFinalApplicationStatus)
      }

      lastState = state
    }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档