首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark源码分析————start-all

Spark源码分析————start-all

作者头像
俺也想起舞
发布2019-07-24 14:37:24
5330
发布2019-07-24 14:37:24
举报

spark启动过程源码分析

版本

spark-1.6

过程分析

start-all.sh

# 加载环境
. "${SPARK_HOME}/sbin/spark-config.sh"
# 启动Master
"${SPARK_HOME}/sbin"/start-master.sh $TACHYON_STR

# 启动Worker
"${SPARK_HOME}/sbin"/start-slaves.sh $TACHYON_STR

start-master.sh

......
# 类名
CLASS="org.apache.spark.deploy.master.Master"

# 加载该类
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  echo "Usage: ./sbin/start-master.sh [options]"
  pattern="Usage:"
  pattern+="\|Using Spark's default log4j profile:"
  pattern+="\|Registered signal handlers for"

  "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
  exit 1
fi
......

org.apache.spark.deploy.master.Master 让我们先来看看main()方法

  def main(argStrings: Array[String]) {
    // 日志注册
    SignalLogger.register(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    // 注册消息处理的环境以及Master的通讯工具,具体通讯方式会另外补充一篇博文
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    // 注册完成后等待接受消息
    rpcEnv.awaitTermination()
  }

PS : Spark-1.6以后RPC默认使用Netty替代Akka,在Netty上加了一层封装,为实现对Spark的定制开发。 具体是为什么,见Enable Spark user applications to use different versions of Akka

startRpcEnvAndEndpoint()

  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    //注册通讯环境
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
   // 注册Master通讯节点
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }

然后到Worker

start-slave.sh

# 流程与Master一样
CLASS="org.apache.spark.deploy.worker.Worker"

if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  echo "Usage: ./sbin/start-slave.sh [options] <master>"
  pattern="Usage:"
  pattern+="\|Using Spark's default log4j profile:"
  pattern+="\|Registered signal handlers for"

  "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
  exit 1
fi

依然是先来看看main()方法

  def main(argStrings: Array[String]) {
 	//与Master里面如出一辙
    SignalLogger.register(log)
    val conf = new SparkConf
    val args = new WorkerArguments(argStrings, conf)
    val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir, conf = conf)
    rpcEnv.awaitTermination()
  }

但是如果大家都是在等待消息的话可定是无法完成交互的,也就是说在启动这些的时候肯定还有其他东西也有运行,于是查看了下Worker继承的类ThreadSafeRpcEndpoint 里面包含一个onStart()方法,

  /**
   * Invoked before [[RpcEndpoint]] starts to handle any message.
   * 会在RpcEndingpoint创建前被调用,用于处理信息
   */
  def onStart(): Unit = {
    // By default, do nothing.
  }

于是找到Worker.onStart()

  override def onStart() {
    assert(!registered)
    //日志记录
    logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
      host, port, cores, Utils.megabytesToString(memory)))
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    logInfo("Spark home: " + sparkHome)
    //创建Worker目录
    createWorkDir()
    shuffleService.startIfEnabled()
    webUi = new WorkerWebUI(this, workDir, webUiPort)
    webUi.bind()
    //向Master进行注册
    registerWithMaster()

    metricsSystem.registerSource(workerSource)
    metricsSystem.start()
    // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
    metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
  }

我们继续往后看Worker.registerWithMaster()

private def registerWithMaster() {
    // onDisconnected may be triggered multiple times, so don't attempt registration
    // if there are outstanding registration attempts scheduled.
    registrationRetryTimer match {
      case None =>
        registered = false
        // 会向所有的Master进行注册,
        // 需要补充的是在Spark中也存在Master的单点故障,所以也可以进行HA配置
        registerMasterFutures = tryRegisterAllMasters()
        connectionAttemptCount = 0
	......
    }
  }

继续查看 tryRegisterAllMasters()方法

 private def tryRegisterAllMasters(): Array[JFuture[_]] = {
    .....
            // 继续向Master注册
            registerWithMaster(masterEndpoint)
	......
  }

registerWithMaster(masterEndpoint: RpcEndpointRef)

  private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
  	// 这里就是向Maser发送消息了,发送了一个RegisterWorker对象
    masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
      workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
      .onComplete {
        // This is a very fast action so we can use "ThreadUtils.sameThread"
        case Success(msg) =>
          Utils.tryLogNonFatalError {
            handleRegisterResponse(msg)
          }
        case Failure(e) =>
          logError(s"Cannot register with master: ${masterEndpoint.address}", e)
          System.exit(1)
      }(ThreadUtils.sameThread)
  }	

发送完消息多半就是去找Master接受消息的方法了

果然在Master中找到了一个receiveAndReply()方法

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    //这里就是接受Work注册消息的地方了
    case RegisterWorker(
        id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {	 
      // 首先日志输出一些Work的资源信息
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      // 由于是对所有的Master发送的消息,所以存在StandBy接受了消息
      // 这里是Master对自己的状态进行判断,如果自己是StandBy,则什么也不做
      if (state == RecoveryState.STANDBY) {
        context.reply(MasterInStandby)
      } else if (idToWorker.contains(id)) {
        //如果该Worker已经在注册表里面,同样是什么也没做
        context.reply(RegisterWorkerFailed("Duplicate worker ID"))
      } else {
        //最后经过重重判断,对传过来的Work信息进行注册
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          workerRef, workerUiPort, publicAddress)
        if (registerWorker(worker)) {
          persistenceEngine.addWorker(worker)
          context.reply(RegisteredWorker(self, masterWebUiUrl))
          schedule()
        } else {
          val workerAddress = worker.endpoint.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress))
        }
      }
    }

其实到这里Start-All基本上就就结束了,后续的就是一些资源的调度了,会在后面继续进行分析

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 版本
  • 过程分析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档