前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark提交任务入口源码分析

Spark提交任务入口源码分析

原创
作者头像
幽鸿
发布2020-03-30 18:44:36
7000
发布2020-03-30 18:44:36
举报
文章被收录于专栏:大数据-数据人生

我们平常在使用Spark进行提交代码的时候,一般是直接在装有spark客户端的机器上提交jar包执行。运行命令如下:

代码语言:javascript
复制
/data/opt/spark-2.3.1-bin-hadoop2.7/bin/spark-submit \
    --class com.tencent.th.dwd.t_dwd_evt_user_action_log_s \
    --total-executor-cores 300 --conf spark.sql.shuffle.partitions=500 \
    SparkV2-1.0.1.jar repartition_num=300

这里的执行入口spark-submit是什么呢?请看:

代码语言:javascript
复制
cat /data/opt/spark-2.3.1-bin-hadoop2.7/bin/spark-submit
if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
export SPARK_HOME=/data/opt/spark-2.3.1-bin-hadoop2.7/
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

这里首先是初始化SPARK_HOME目录,然后执行编译后的类:org.apache.spark.deploy.SparkSubmit,那么这个入口类做了哪些工作呢?请看源代码:

代码语言:javascript
复制
def main(args: Array[String]): Unit = {
  //这里将传入的args参数进行初始化
  val appArgs = new SparkSubmitArguments(args)
  //判断参数是否有效合法
  if (appArgs.verbose) {
    // scalastyle:off println
    printStream.println(appArgs)
    // scalastyle:on println
  }
  //判断执行类别
  appArgs.action match {
    case SparkSubmitAction.SUBMIT => submit(appArgs)
    case SparkSubmitAction.KILL => kill(appArgs)
    case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
  }
}

如果提交命令正确,开始执行spark:

代码语言:javascript
复制
/**
 * Submit the application using the provided parameters.
 *
 * This runs in two steps. First, we prepare the launch environment by setting up
 * the appropriate classpath, system properties, and application arguments for
 * running the child main class based on the cluster manager and the deploy mode.
 * Second, we use this launch environment to invoke the main method of the child
 * main class.
 */
@tailrec
private def submit(args: SparkSubmitArguments): Unit = {
  /**准备执行环境,这里主要得到了以下4个参数:
      (1)childArgs: 子进程的参数
      (2)childClasspath: 子进程的执行环境
      (3)sysProps:系统参数
      (4)childMainClass:子类名  
  **/
  val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

  //开始执行Spark任务
  def doRunMain(): Unit = {
    //是否需要创建代理用户
    if (args.proxyUser != null) {
      val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
        UserGroupInformation.getCurrentUser())
      try {
        proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
          override def run(): Unit = {
            runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
          }
        })
      } catch {
        case e: Exception =>
          // Hadoop's AuthorizationException suppresses the exception's stack trace, which
          // makes the message printed to the output by the JVM not very helpful. Instead,
          // detect exceptions with empty stack traces here, and treat them differently.
          if (e.getStackTrace().length == 0) {
            // scalastyle:off println
            printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
            // scalastyle:on println
            exitFn(1)
          } else {
            throw e
          }
      }
    } else {
      runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
    }
  }

执行的时候无论创建代理用户,最后都是调用 runMain方法开始执行,在runMain方法中,先是初始化判断参数是否verbose,然后是加载jar包:

代码语言:javascript
复制
for (jar <- childClasspath) {
  addJarToClasspath(jar, loader)
}

接下来做了两件核心的事情,第一个:加载要执行的类:

代码语言:javascript
复制
mainClass = Utils.classForName(childMainClass)

第二个,判断要执行的任务的入口:

代码语言:javascript
复制
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

最后一步,通过反射调用要执行类的任务:

代码语言:javascript
复制
mainMethod.invoke(null, childArgs.toArray)

整体来看,执行入口的代码还是比较清晰易懂的。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档