前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core源码精读计划8 | SparkEnv中RPC环境的基础构建

Spark Core源码精读计划8 | SparkEnv中RPC环境的基础构建

作者头像
大数据真好玩
发布2019-08-08 15:49:08
5490
发布2019-08-08 15:49:08
举报
文章被收录于专栏:暴走大数据暴走大数据

目录

  • 前言
  • RPC端点及其引用
    • RpcEndpoint
    • RpcEndpoint继承体系
    • RpcEndpointRef
  • NettyRpcEnv概况
    • 创建NettyRpcEnv
    • NettyRpcEnv中的属性成员
  • 总结

前言

在之前的文章中,我们由SparkContext的初始化提到了事件总线LiveListenerBus与执行环境SparkEnv。在讲解SparkEnv的过程中,RPC环境RpcEnv又是首先被初始化的重要组件。做个不怎么恰当的比较,SparkEnv之于SparkContext,正如RpcEnv之于SparkEnv。

由于RPC环境负责着Spark体系内几乎所有内部及外部通信,内容很多,所以一篇文章必然讲不完。本文还是从基础开始看起。

RPC端点及其引用

RpcEnv抽象类是Spark RPC环境的通用表示,它其中定义的setupEndpoint()方法用来向RPC环境注册一个RPC端点(RpcEndpoint),并返回其引用(RpcEndpointRef)。如果客户端要对一个RpcEndpoint发送消息,那么必须首先获得其对应的RpcEndpointRef。它们之间的关系可以用如下简图表示。

图#8.1 - RPC环境与RPC端点

既然RpcEndpoint和RpcEndpointRef是RPC环境中的基础组件,我们先来研究它们的源码。

RpcEndpoint

RpcEndpoint是一个特征,其代码如下。

代码#8.1 - o.a.s.rpc.RpcEndpoint特征

代码语言:javascript
复制
private[spark] trait RpcEndpoint {
  val rpcEnv: RpcEnv

  final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }

  def receive: PartialFunction[Any, Unit] = {
    case _ => throw new SparkException(self + " does not implement 'receive'")
  }

  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
  }

  def onError(cause: Throwable): Unit = {
    throw cause
  }

  def onConnected(remoteAddress: RpcAddress): Unit = { }

  def onDisconnected(remoteAddress: RpcAddress): Unit = { }

  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { }

  def onStart(): Unit = { }

  def onStop(): Unit = { }

  final def stop(): Unit = {
    val _self = self
    if (_self != null) {
      rpcEnv.stop(_self)
    }
  }
}

其中定义了如下方法,这些相当于是RPC端点在RPC环境中的“行为准则”。

  • self():取得当前RpcEndpoint对应的RpcEndpointRef。
  • receive()/receiveAndReply():接收其他RpcEndpointRef传来的消息并进行处理,receiveAndReply()方法还会发送回复。
  • onError():消息处理出现异常时调用的方法。
  • onConnected()/onDisconnected():当前RPC端点建立连接或断开连接时调用的方法。
  • onNetworkError():RPC端点的连接出现网络错误时调用的方法。
  • onStart()/onStop():RPC端点初始化与关闭时调用的方法。
  • stop():停止当前RpcEndpoint。
RpcEndpoint继承体系

RpcEndpoint的主要继承体系如下图所示。

#图8.2 - RpcEndpoint的主要继承体系

图中可以看到不少之前出现过的RPC端点,如文章#2中的HeartbeatReceiver,文章#7中的MapOutputTrackerMasterEndpoint、BlockManagerMasterEndpoint等。在今后涉及到它们时,会专门进行讲解。

另外,图中的ThreadSafeRpcEndpoint是直接继承自RpcEndpoint的特征。顾名思义,它要求RPC端点对消息的处理必须是线程安全的,用文档中的话说,线程安全RPC端点处理消息必须满足happens-before原则。

RpcEndpointRef

RpcEndpointRef是一个抽象类,其代码如下。

代码#8.2 - o.a.s.rpc.RpcEndpointRef抽象类

代码语言:javascript
复制
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
  extends Serializable with Logging {
  private[this] val maxRetries = RpcUtils.numRetries(conf)
  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
  private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

  def address: RpcAddress

  def name: String

  def send(message: Any): Unit

  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

  def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)

  def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
    val future = ask[T](message, timeout)
    timeout.awaitResult(future)
  }
}

这个抽象类的开头有三个属性,都是通过RpcUtils工具类从Spark配置项中取出来的,如下。

  • maxRetries:最大重连次数,对应配置项为spark.rpc.numRetries,默认值3次。
  • retryWaitMs:每次重连之前等待的时长,对应配置项为spark.rpc.retry.wait,默认值3秒。
  • defaultAskTimeout:对RPC端点进行ask()操作(下面会讲到)的默认超时时长,对应配置项为spark.rpc.askTimeout与spark.network.timeout(前者优先级高于后者),默认值120秒。

值得注意的是,maxRetries与retryWaitMs两个属性在当前的2.3.3版本中都没有用到,而在之前的版本中还是有用到的,证明Spark官方取消了RPC重试机制,也就是统一为消息传递语义中的at most once语义了。当然,我们也可以自己实现带有重试机制的RPC端点引用。

address和name方法分别返回RPC端点引用对应的地址和名称,不必多讲。下面几个方法的含义如下。

  • send()方法:异步发送一条单向的消息,并且“发送即忘记”(fire-and-forget),不需要回复。
  • ask()方法:异步发送一条消息,并在规定的超时时间内等待RPC端点的回复。RPC端点会调用receiveAndReply()方法来处理。
  • askSync()方法:是ask()方法的同步实现。由于它是阻塞操作,有可能会消耗大量时间,因此必须慎用。

RpcEndpointRef只有一个子类,即NettyRpcEndpointRef。它对send()和ask()两个方法的实现如下。

代码#8.3 - o.a.s.rpc.netty.NettyRpcEndPointRef.send()与ask()方法

代码语言:javascript
复制
  override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
    nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout)
  }

  override def send(message: Any): Unit = {
    require(message != null, "Message is null")
    nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
  }

可见是依赖于NettyRpcEnv类的,下面来看一下它是如何创建出来的。

NettyRpcEnv概况

创建NettyRpcEnv

在文章#7的代码#7.4~#7.5中,通过工厂类NettyRpcEnvFactory的create()方法创建出了NettyRpcEnv,它是目前Spark官方提供的RPC环境的唯一实现。该方法的代码如下。

代码#8.4 - o.a.s.rpc.netty.NettyRpcEnvFactory.create()方法

代码语言:javascript
复制
  def create(config: RpcEnvConfig): RpcEnv = {
    val sparkConf = config.conf
    val javaSerializerInstance =
      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager, config.numUsableCores)
    if (!config.clientMode) {
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }
    nettyEnv
  }

可见,这个方法先创建了JavaSerializer序列化器,用于RPC传输中的序列化。然后通过NettyRpcEnv的构造方法创建NettyRpcEnv,这其中也会涉及到一些RPC基础组件的初始化,后面会讲解到。最后定义偏函数startNettyRpcEnv,并调用通用工具类Utils中的startServiceOnPort()方法来启动NettyRpcEnv。

NettyRpcEnv中的属性成员

我们暂时先不看NettyRpcEnv类的细节,而是先来看看它内部包含了哪些组件。

代码#8.5 - o.a.s.rpc.netty.NettyRpcEnv中的属性成员

代码语言:javascript
复制
  private[netty] val transportConf = SparkTransportConf.fromSparkConf(
    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
    "rpc",
    conf.getInt("spark.rpc.io.threads", 0))

  private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

  private val streamManager = new NettyStreamManager(this)

  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

  private val clientFactory = transportContext.createClientFactory(createClientBootstraps())

  @volatile private var fileDownloadFactory: TransportClientFactory = _

  val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")

  private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
    "netty-rpc-connection",
    conf.getInt("spark.rpc.connect.threads", 64))

  @volatile private var server: TransportServer = _

  private val stopped = new AtomicBoolean(false)

  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
  • TransportConf:传输配置,作用在RPC环境中类似于SparkConf,负责管理与RPC相关的各种配置。
  • Dispatcher:调度器,或者叫分发器,用于将消息路由到正确的RPC端点。
  • NettyStreamManager:流式管理器,用于处理RPC环境中的文件,如自定义的配置文件或者JAR包。
  • TransportContext:传输上下文,作用在RPC环境中类似于SparkContext,负责管理RPC的服务端(TransportServer)与客户端(TransportClient),与它们之间的Netty传输管道。
  • TransportClientFactory:创建RPC客户端TransportClient的工厂。
  • TransportServer:RPC环境中的服务端,负责提供基础且高效的流式服务。

TransportConf和TransportContext提供底层的基于Netty的RPC机制,TransportClient和TransportServer则是RPC端点的最低级别抽象。

总结

本文讲解了RPC环境的基本组成部分RpcEndpoint、RpcEndpointRef的细节实现,并初步了解了NettyRpcEnv的创建过程,以及它内部包含的主要组件。虽然TransportConf和TransportContext更为基础,但为了避免嵌套太深出不来,下一篇文章暂时不准备讲它们,而主要来研究NettyRpcEnv内的调度器Dispatcher,它是整个RPC环境高效运转的基础。

— THE END —

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • RPC端点及其引用
    • RpcEndpoint
      • RpcEndpoint继承体系
        • RpcEndpointRef
        • NettyRpcEnv概况
          • 创建NettyRpcEnv
            • NettyRpcEnv中的属性成员
            • 总结
            相关产品与服务
            事件总线
            腾讯云事件总线(EventBridge)是一款安全,稳定,高效的云上事件连接器,作为流数据和事件的自动收集、处理、分发管道,通过可视化的配置,实现事件源(例如:Kafka,审计,数据库等)和目标对象(例如:CLS,SCF等)的快速连接,当前 EventBridge 已接入 100+ 云上服务,助力分布式事件驱动架构的快速构建。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档