本片博文主要分析的是Standalone
模式下 Spark 集群(Master, work
)启动流程
各启动脚本及源码分析
start-master.sh Master
启动脚本启动 Master 的主要 shell
流程
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
--host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
$ORIGINAL_ARGS
case $option in
(start)
run_command class "$@"
;;
esac
run_command() {
mode="$1"
case "$mode" in
(class)
execute_command nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class "$command" "$@"
;;
esac
}
execute_command() {
if [ -z ${SPARK_NO_DAEMONIZE+set} ]; then
# 最终以后台守护进程的方式启动 Master
nohup -- "$@" >> $log 2>&1 < /dev/null &
fi
}
/opt/module/spark-standalone/bin/spark-class org.apache.spark.deploy.master.Master
--host hadoop002
--port 7077
--webui-port 8080
bin/spark-class
/opt/module/jdk1.8.0_172/bin/java
-cp /opt/module/spark-standalone/conf/:/opt/module/spark-standalone/jars/*
-Xmx1g org.apache.spark.deploy.master.Master
--host hadoop002
--port 7077
--webui-port 8080
start-slaves.sh Worker
启动脚本启动 Worker 的主要 shell
流程
"${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
# worker类
CLASS="org.apache.spark.deploy.worker.Worker"
if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
# worker webui 端口号
SPARK_WORKER_WEBUI_PORT=8081
fi
if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
start_instance 1 "$@"
fi
# 启动worker实例 spark-daemon.sh在启动Master的时候已经使用过一次了
function start_instance {
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
--webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"
}
/opt/module/spark-standalone/bin/spark-class org.apache.spark.deploy.worker.Worker
--webui-port 8081
spark://hadoop002:7077
bin/spark-class
opt/module/jdk1.8.0_172/bin/java
-cp /opt/module/spark-standalone/conf/:/opt/module/spark-standalone/jars/*
-Xmx1g org.apache.spark.deploy.worker.Worker
--webui-port 8081
spark://hadoop002:7077
Master
启动源码org.apache.spark.deploy.master.Master
private[deploy] object Master extends Logging {
val SYSTEM_NAME = "sparkMaster"
val ENDPOINT_NAME = "Master"
// 启动 Master 的入口函数
def main(argStrings: Array[String]) {
Utils.initDaemon(log)
val conf = new SparkConf
// 构建用于参数解析的实例 --host hadoop201 --port 7077 --webui-port 8080
val args = new MasterArguments(argStrings, conf)
// 启动 RPC 通信环境和 MasterEndPoint(通信终端)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
/**
* Start the Master and return a three tuple of:
* 启动 Master 并返回一个三元组
* (1) The Master RpcEnv
* (2) The web UI bound port
* (3) The REST server bound port, if any
*/
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
// 创建 Master 端的 RpcEnv 环境 参数: sparkMaster hadoop201 7077 conf securityMgr
// 实际类型是: NettyRpcEnv
val rpcEnv: RpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
// 创建 Master对象, 该对象就是一个 RpcEndpoint, 在 RpcEnv中注册这个RpcEndpoint
// 返回该 RpcEndpoint 的引用, 使用该引用来接收信息和发送信息
val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
// 向 Master 的通信终端发送请求,获取 BoundPortsResponse 对象
// BoundPortsResponse 是一个样例类包含三个属性: rpcEndpointPort webUIPort restPort
val portsResponse: BoundPortsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
}
RpcEnv
的创建真正的创建是调用NettyRpcEnvFactory的create方法创建的.
创建 NettyRpcEnv的时候, 会创建消息分发器, 收件箱和存储远程地址与发件箱的 Map
RpcEnv.scala
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean): RpcEnv = {
// 保存 RpcEnv 的配置信息
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
clientMode)
// 创建 NettyRpcEvn
new NettyRpcEnvFactory().create(config)
}
NettyRpcEnvFactory
private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
def create(config: RpcEnvConfig): RpcEnv = {
val sparkConf = config.conf
// Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
// KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
// 用于 Rpc传输对象时的序列化
val javaSerializerInstance: JavaSerializerInstance = new JavaSerializer(sparkConf)
.newInstance()
.asInstanceOf[JavaSerializerInstance]
// 实例化 NettyRpcEnv
val nettyEnv = new NettyRpcEnv(
sparkConf,
javaSerializerInstance,
config.advertiseAddress,
config.securityManager)
if (!config.clientMode) {
// 定义 NettyRpcEnv 的启动函数
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
// 启动 NettyRpcEnv
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
throw e
}
}
nettyEnv
}
}
Master是一个RpcEndpoint
.
他的生命周期方法是:
constructor -> onStart -> receive* -> onStop
onStart 主要代码片段
// 创建 WebUI 服务器
webUi = new MasterWebUI(this, webUiPort)
// 按照固定的频率去启动线程来检查 Worker 是否超时. 其实就是给自己发信息: CheckForWorkerTimeOut
// 默认是每分钟检查一次.
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
// 在 receive 方法中对 CheckForWorkerTimeOut 进行处理
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
处理Worker是否超时的方法
/** Check for, and remove, any timed-out workers */
private def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it
val currentTime = System.currentTimeMillis()
// 把超时的 Worker 从 Workers 中移除
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
for (worker <- toRemove) {
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT_MS / 1000))
removeWorker(worker)
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
}
}
到此, Master
启动完成.
Worker
启动源码org.apache.spark.deploy.worker.Worker
private[deploy] object Worker extends Logging {
val SYSTEM_NAME = "sparkWorker"
val ENDPOINT_NAME = "Worker"
def main(argStrings: Array[String]) {
Utils.initDaemon(log)
val conf = new SparkConf
// 构建解析参数的实例
val args = new WorkerArguments(argStrings, conf)
// 启动 Rpc 环境和 Rpc 终端
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir, conf = conf)
rpcEnv.awaitTermination()
}
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
// 创建 RpcEnv 实例 参数: "sparkWorker", "hadoop201", 8081, conf, securityMgr
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
// 根据传入 masterUrls 得到 masterAddresses. 就是从命令行中传递过来的 Master 地址
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
// 最终实例化 Worker 得到 Worker 的 RpcEndpoint
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}
}
onStart 方法
override def onStart() {
// 第一次启动断言 Worker 未注册
assert(!registered)
// 创建工作目录
createWorkDir()
// 启动 shuffle 服务
shuffleService.startIfEnabled()
// Worker的 WebUI
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
// 向 Master 注册 Worker
registerWithMaster()
}
registerWithMaster 方法
关键代码:
// 向所有的 Master 注册
registerMasterFutures = tryRegisterAllMasters()
tryRegisterAllMasters() 方法
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
masterRpcAddresses.map { masterAddress =>
// 从线程池中启动线程来执行 Worker 向 Master 注册
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
// 根据 Master 的地址得到一个 Master 的 RpcEndpointRef, 然后就可以和 Master 进行通讯了.
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// 向 Master 注册
registerWithMaster(masterEndpoint)
} catch {
}
}
})
}
}
registerWithMaster 方法
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
// 向 Master 对应的 receiveAndReply 方法发送信息
// 信息的类型是 RegisterWorker, 包括 Worker 的一些信息: id, 主机地址, 端口号, 内存, webUi
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
// 注册成功
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
// 注册失败
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
}
Master的receiveAndReply方法
// 处理 Worker 的注册信息
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
if (state == RecoveryState.STANDBY) {
// 给发送者回应消息. 对方的 receive 方法会收到这个信息
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) { // 如果要注册的 Worker 已经存在
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
// 根据传来的信息封装 WorkerInfo
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) { // 注册成功
persistenceEngine.addWorker(worker)
// 响应信息
context.reply(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
worker的handleRegisterResponse方法
case RegisteredWorker(masterRef, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
// 已经注册过了
registered = true
// 更新 Master
changeMaster(masterRef, masterWebUiUrl)
// 通知自己给 Master 发送心跳信息 默认 1 分钟 4 次
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
Worker的receive方法
case SendHeartbeat =>
if (connected) {
// 给 Master 发送心跳
sendToMaster(Heartbeat(workerId, self))
}
Master的receive方法:
case Heartbeat(workerId, worker) =>
idToWorker.get(workerId) match {
case Some(workerInfo) =>
// 记录该 Worker 的最新心跳
workerInfo.lastHeartbeat = System.currentTimeMillis()
}
到此, Worker启动完成
本次的分享就到这里了