首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark RPC原理

spark RPC原理

作者头像
俺也想起舞
发布2019-07-24 14:37:42
9430
发布2019-07-24 14:37:42
举报

Spark中的RPC交互过程

1. 概述

Spark-1.6以后RPC默认使用Netty替代Akka,在Netty上加了一层封装,为实现对Spark的定制开发,所以了解Spark中RPC的原理还是有必要的

Akka是一个异步的消息框架,所谓的异步,简言之就是消息发送方发送出消息,不用阻塞等待结果,接收方处理完返回结果即可。Akka支持百万级的消息传递,特别适合复杂的大规模分布式系统。Akka基于Actor模型,提供用于创建可扩展,弹性,快速响应的应用程序的平台。Actor封装了状态和行为的对象,不同的Actor之间可通过消息交换实现通信,每个Actor都有自己的消息收件箱。Akka可以简化并发场景下的开发,其异步,高性能的事件驱动模型,轻量级的事件处理可大大方便用于开发复杂的分布式系统。早期Spark大量采用Akka作为RPC。Netty也是一个知名的高性能,异步消息框架,Spark早期便使用它解决大文件传输问题,用来克服Akka的短板。根据社区的说法,因为很多Spark用户饱受Akka复杂依赖关系的困扰,所以后来干脆就直接用Netty代替了Akka。

2. Spark 1.6+ 中的RPC

2.1 RpcEndpoint

用来通讯的个体(Master,Worker,Diver),一个RpcEndpoint的生命周期包括:onStart()->recevie->onStop(),当然也还有很多其他方法,后面会有介绍

2.2 RpcEndpointRef

RpcEndpoint的一个引用,当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。

2.3 RpcAddress

RpcEndpointRef的地址,Host + Port。

2.4 RpcEnv

RpcEnv为RpcEndpoint提供处理消息的环境。RpcEnv负责RpcEndpoint整个生命周期的管理,包括:注册endpoint,endpoint之间消息的路由,以及停止endpoint。

3. RPC网络通信的抽象图

  1. 核心的RpcEnv是一个特质(trait),它主要提供了停止,注册,获取endpoint等方法的定义,而NettyRpcEnv提供了该特质的一个具体实现。
  2. 通过工厂RpcEnvFactory来产生一个RpcEnv,而NettyRpcEnvFactory用来生成NettyRpcEnv的一个对象。
  3. 当我们调用RpcEnv中的setupEndpoint来注册一个endpoint到rpcEnv的时候,在NettyRpcEnv内部,会将该endpoint的名称与其本省的映射关系,rpcEndpoint与rpcEndpointRef之间映射关系保存在dispatcher对应的成员变量中。

3. Master具体实现

3.1 Master类的定义
private[deploy] class Master(
    override val rpcEnv: RpcEnv,
    address: RpcAddress,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
  ......

可以看到Master继承ThreadSafeRpcEndpoint,而ThreadSafeRpcEndpoint继承RpcEndpoint,跟上面的抽象图差不多

3.2 Master启动
  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }

会通过NettyRpcEnvFactory创建一个RpcEnv,rcpEnv在创建一个Endpoint,也就是Master了

3.3 RpcEndpoint特质

master的启动会创建一个RpcEnv并将自己注册到其中。继续看RpcEndpoint特质的定义:

private[spark] trait RpcEndpoint {
  //当前RpcEndpoint注册到的RpcEnv主子,可以类比为Akka中的actorSystem
  val rpcEnv: RpcEnv
  //直接用来发送消息的RpcEndpointRef,可以类比为Akka中的actorRef
  final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }
  //处理来自RpcEndpointRef.send或者RpcCallContext.reply的消息
  def receive: PartialFunction[Any, Unit] = {
    case _ => throw new SparkException(self + " does not implement 'receive'")
  }
  //处理来自RpcEndpointRef.ask的消息,会有相应的回复
  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
  }
  //篇幅限制,其余onError,onConnected,onDisconnected,onNetworkError,
  //onStart,onStop,stop方法此处省略
}
3.4 RpcEnv抽象类
private[spark] abstract class RpcEnv(conf: SparkConf) {
  private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
  //返回endpointRef
  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
  //返回RpcEnv监听的地址
  def address: RpcAddress
  //注册一个RpcEndpoint到RpcEnv并返回RpcEndpointRef
  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
  //通过uri异步地查询RpcEndpointRef
  def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
  //通过uri查询RpcEndpointRef,这种方式会产生阻塞
  def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
    defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
  }
  //通过address和endpointName查询RpcEndpointRef,这种方式会产生阻塞
  def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
    setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
  }
  //关掉endpoint
  def stop(endpoint: RpcEndpointRef): Unit
  //关掉RpcEnv
  def shutdown(): Unit
  //等待结束
  def awaitTermination(): Unit
  //没有RpcEnv的话RpcEndpointRef是无法被反序列化的,这里是反序列化逻辑
  def deserialize[T](deserializationAction: () => T): T
  //返回文件server实例
  def fileServer: RpcEnvFileServer
  //开一个针对给定URI的channel用来下载文件
  def openChannel(uri: String): ReadableByteChannel
}

另外RpcEnv有一个伴生对象,实现了create方法:

private[spark] object RpcEnv {
  def create(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      clientMode: Boolean = false): RpcEnv = {
    val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
    new NettyRpcEnvFactory().create(config)
  }
}

这就是在master启动方法中的create具体实现,可以看到调用了Netty工厂方法NettyRpcEnvFactory,该方法是对Netty的具体封装。

3.5 master中消息处理

上文可以看到,在RpcEndpoint中最核心的便是receive和receiveAndReply方法,定义了消息处理的核心逻辑,master中也有相应的实现:

override def receive: PartialFunction[Any, Unit] = {
    case ElectedLeader =>      
    case CompleteRecovery => 
    case RevokedLeadership =>
    case RegisterApplication(description, driver) =>
    case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
    case DriverStateChanged(driverId, state, exception) =>
    case Heartbeat(workerId, worker) =>
    case MasterChangeAcknowledged(appId) =>
    case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
    case WorkerLatestState(workerId, executors, driverIds) =>
    case UnregisterApplication(applicationId) =>
    case CheckForWorkerTimeOut =>
}

这里定义了master一系列的消息处理逻辑,而receiveAndReply中,

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RegisterWorker(
        id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
    case RequestSubmitDriver(description) =>
    case RequestKillDriver(driverId) =>
    case RequestDriverStatus(driverId) =>
    case RequestMasterState =>
    case BoundPortsRequest =>
    case RequestExecutors(appId, requestedTotal) =>
    case KillExecutors(appId, executorIds) =>
}

定义了对需要回复的消息组的处理逻辑。

在看看Worker的实现

3.6 Worker
private[deploy] class Worker(
    override val rpcEnv: RpcEnv,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterRpcAddresses: Array[RpcAddress],
    endpointName: String,
    workDirPath: String = null,
    val conf: SparkConf,
    val securityMgr: SecurityManager)
  extends ThreadSafeRpcEndpoint with Logging {

与Master一样

3.7 worker启动方法
def startRpcEnvAndEndpoint(
    host: String,
    port: Int,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterUrls: Array[String],
    workDir: String,
    workerNumber: Option[Int] = None,
    conf: SparkConf = new SparkConf): RpcEnv = {
  // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
  val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
  val securityMgr = new SecurityManager(conf)
  val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
  val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
  rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
    masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
  rpcEnv
}

和Master一样,但是Worker会在onStart()向Master进行注册

3.8 worker注册到master RpcEnv
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
  masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
    workerId, host, port, self, cores, memory, workerWebUiUrl))
    .onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) =>
        Utils.tryLogNonFatalError {
          handleRegisterResponse(msg)
        }
      case Failure(e) =>
        logError(s"Cannot register with master: ${masterEndpoint.address}", e)
        System.exit(1)
    }(ThreadUtils.sameThread)
}

masterEndpoint.ask是核心,发送了一个RegisterWorker消息到masterEndpoint并期待对方的RegisterWorkerResponse,对response做出相应的处理。这样worker就成功和master建立了连接,它们之间可以互相发送消息进行通信。

3.9worker到master的通信

worker和master之间是一个主从关系,worker注册到master之后,master就可以通过消息传递实现对worker的管理,在worker中有一个方法:

private def sendToMaster(message: Any): Unit = {
  master match {
    case Some(masterRef) => masterRef.send(message)
    case None =>
      logWarning(
        s"Dropping $message because the connection to master has not yet been established")
  }
}

一目了然,就是干的发送消息到master的活儿,在worker中很多地方都用到这个方法,比如handleExecutorStateChanged(executorStateChanged:ExecutorStateChanged)方法中,sendToMaster(executorStateChanged)就向masterRef发送了executorStateChanged消息,前文中master中的recevie方法中,就有一个对ExecutorStateChanged消息的处理逻辑。

3.10master到worker的通信

同样的,master要对worker实现管理也是通过发送消息实现的,比如launchExecutor(worker: WorkerInfo, exec: ExecutorDesc)方法中:

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  worker.addExecutor(exec)
  //向worker发送LaunchExecutor消息
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  exec.application.driver.send(
    ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

master向worker发送了LaunchExecutor消息告诉worker应该启动executor了,而worker中的receive方法中对LaunchExecutor消息进行处理并完成master交代给自己的任务。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. Spark 1.6+ 中的RPC
    • 2.1 RpcEndpoint
      • 2.2 RpcEndpointRef
        • 2.3 RpcAddress
          • 2.4 RpcEnv
          • 3. RPC网络通信的抽象图
          • 3. Master具体实现
            • 3.1 Master类的定义
              • 3.2 Master启动
                • 3.3 RpcEndpoint特质
                  • 3.4 RpcEnv抽象类
                    • 3.5 master中消息处理
                      • 3.6 Worker
                        • 3.7 worker启动方法
                          • 3.8 worker注册到master RpcEnv
                            • 3.9worker到master的通信
                              • 3.10master到worker的通信
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档