首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

关于Spark的RPC机制

一直以来,基于 Akka 实现的 RPC 通信框架是 Spark 引以为豪的主要特性,也是与 Hadoop 等分布式计算框架对比过程中一大亮点,但是时代和技术都在演化,从 Spark1.3.1 版本开始,为了解决大块数据(如 Shuffle )的传输问题, Spark 引入了 Netty 通信框架,到了 1.6.0 版本, Netty 完成取代了 Akka ,承担 Spark 内部所有的 RPC 通信以及数据流传输。

Spark RPC 中最为重要的三个抽象(“三剑客”)为:RpcEnv 、 RpcEndpoint 、 RpcEndpointRef ,这样做的好处有:

l 对上层的 API 来说,屏蔽了底层的具体实现,使用方便

l 可以通过不同的实现来完成指定的功能,方便扩展

l 促进了底层实现层的良性竞争, Spark 1.6.3 中默认使用了 Netty 作为底层的实现,但 Akka 的依赖仍然存在;而 Spark 2.1.0 中的底层实现只有 Netty ,这样用户可以方便的使用不同版本的 Akka 或者将来某种更好的底层实现。

Send a message locally

通过 Spark 源码中的一个 Test ( RpcEnvSuite.scala )来分析一下发送本地消息的具体流程,源码如下(对源码做了一些修改):

test( "send a message locally" ) {

@volatile var message: String = null

val rpcEndpointRef = env.setupEndpoint( "send-locally" , new RpcEndpoint {

override val rpcEnv = env

override def receive = {

//case msg: String => message = msg

case msg: String => println(message) // 我们直接将接收到的消息打印出来

}

})

rpcEndpointRef.send( "hello" )

// 下面是原来的代码

//eventually(timeout(5 seconds), interval(10 millis)) {

// assert("hello" === message)

//}

}

首先是 RpcEndpoint 创建并注册的流程:

· 1 、 创建 RpcEndpoint ,并初始化 rpcEnv 的引用( RpcEnv 已经创建好,底层实际上是实例化了一个 NettyRpcEnv ,而 NettyRpcEnv 是通过工厂方法 NettyRpcEnvFactory 创建的)

· 2 、 实例化 RpcEndpoint 之后需要向 RpcEnv 注册该 RpcEndpoint ,底层实现是向 NettyRpcEnv 进行注册,而实际上是通过调用 Dispatcher 的 registerRpcEndpoint 方法向 Dispatcher 进行注册

· 3 、 具体的注册就是向 endpoints 、 endpointRefs 、 receivers 中插入记录:而 receivers 中插入的信息会被 Dispatcher 中的线程池中的线程执行:会将记录 take 出来然后调用 Inbox 的 process 方法通过模式匹配的方法进行处理,注册的时候通过匹配到 OnStart 类型的 message ,去执行 RpcEndpoint 的 onStart 方法(例如 Master 、 Worker 注册时,就要执行各自的 onStart 方法),本例中未做任何操作

· 4 、 注册完成后返回 RpcEndpointRef ,我们通过 RpcEndpointRef 就可以向其代表的 RpcEndpoint 发送消息

下面就是通过 RpcEndpointRef 向其代表的 RpcEndpoint 发送消息的具体流程:

· 1 、 2 、调用 RpcEndpointRef 的 send 方法,底层实现是调用 Netty 的 NettyRpcEndpointRef 的 send 方法,而实际上又是调用的 NettyRpcEnv 的 send 方法,发送的消息使用 RequestMessage 进行封装:

nettyEnv.send( RequestMessage (nettyEnv.address, this , message))

· 3 、 4 、 NettyRpcEnv 的 send 方法首先会根据 RpcAddress 判断是本地还是远程调用,此处是同一个 RpcEnv ,所以是本地调用,即调用 Dispatcher 的 postOneWayMessage 方法

· 5 、 postOneWayMessage 方法内部调用 Dispatcher 的 postMessage 方法

· 6 、 postMessage 会向具体的 RpcEndpoint 发送消息,首先通过 endpointName 从 endpoints 中获得注册时的 EndpointData ,如果不为空就执行 EndpointData 中 Inbox 的 post(message) 方法,向 Inbox 的 mesages 中插入一条 InboxMessage ,同时向 receivers 中插入一条记录,此处将 Inbox 单独画出来是为了方便大家理解

· 7 、 Dispatcher 中的线程池会拿出一条线程用来循环 receivers 中的消息,首先使用 take 方法获得 receivers 中的一条记录,然后调用 Inbox 的 process 方法来执行这条记录,而 process 将 messages 中的一条 InboxMessage (第 6 步中插入的)拿出来进行处理,具体的处理方法就是通过模式匹配的方法,匹配到消息的类型(此处是 OneWayMessage ),然后来执行 RpcEndpoint 中对应的 receive 方法,在此例中我们只打印出这条消息(步骤 8 )

至此,一个简单的发送本地消息的流程执行完成。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20200427A0PTFB00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券