前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >项目推荐 I 手写RPC框架(二)

项目推荐 I 手写RPC框架(二)

作者头像
用户3946442
发布2022-04-11 18:40:33
4070
发布2022-04-11 18:40:33
举报
文章被收录于专栏:程序媛驿站

前言

RPC框架代码量较多,将仅对核心过程进行梳理,完整代码见:https://github.com/wdw87/wRpc

在这篇推文中,将实现Rpc的远程通信。远程通信是 RPC 的根本,本 RPC 框架还是采用 Netty 来作为通信框架。

在本项目的系统推文中,将对项目进行详细的介绍

主要将按照下面的内容进行分配(蓝色字体可戳):

手写RPC框架(一)

RPC简介、技术栈介绍、测试Demo

手写RPC框架(二)

远程通信实现

手写RPC框架(三)

制定协议与编解码器、动态代理

手写RPC框架(四)

注册中心

Rpc框架示意图

四、实现远程通信

远程通信是 RPC 的根本,本 RPC 框架还是采用 Netty 来作为通信框架。远程通信必然会有一个 Server 和一个 Client 的实现。下面就先介绍该 RPC 框架的实现:

完整代码见:https://github.com/wdw87/wRpc

1. 服务端

服务端负责接收客户端的请求,并做出响应。

一个Netty服务端主要包括两部分组成

  • 配置服务端的启动类,比如下方的NettyServer
  • 处理请求的逻辑类, 比如下方的 ServiceRequestHandler
NettyServer
代码语言:javascript
复制
public class NettyServer {

...
   //服务端启动的核心方法
   public void start(){
...
  }

...

}

其中,start方法中的核心内容如下:

代码语言:javascript
复制
...

//Netty提供的服务端启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
  .channel(NioServerSocketChannel.class)
  .option(ChannelOption.SO_KEEPALIVE, true)   //启用tcp协议层面的keep-live
  .childHandler(new ChannelInitializer<SocketChannel>() {
       @Override
       protected void initChannel(SocketChannel socketChannel) throws Exception {
           //拆包Handler
           socketChannel.pipeline().addLast(new Spliter());
           //解码Handler
           socketChannel.pipeline().addLast(new Decoder());
           //编码Handler
           socketChannel.pipeline().addLast(new Encoder());
          ...

           //处理客户端请求的Handler
           socketChannel.pipeline().addLast(serviceRequestHandler);

          ...

      }
  });

...

});
//等待服务端关闭
channelFuture.channel().closeFuture().sync();

可以看出在服务端启动时,会为服务端配置相应的Handler;

当有请求到达时,这些Handler会依次对请求做相应的处理,比如首先Spliter对通信数据包进行拆包粘包,保证数据包的完整性,然后Decoder对数据包进行解码,得到请求内容,最后serviceRequestHandler处理请求内容,并且做出响应。

ServiceRequestHandler

ServiceRequestHandler类继承了Netty框架提供的SimpleChannelInboundHandler类,并且重写了三个方法:channelActive()、channelInactive()和channelRead0()。

前两个方法顾名思义,在客户端建立连接和断开连接时执行回调,最后一个方法在收到客户端请求时执行回调,是处理请求的核心方法。

代码语言:javascript
复制
public class ServiceRequestHandler extends SimpleChannelInboundHandler<ServiceRequestPacket> {

   public static final ServiceRequestHandler INSTANCE = new ServiceRequestHandler();

   private ServiceInvoker serviceInvoker = ServiceInvoker.INSTANCE;

   @Override
   public void channelActive(ChannelHandlerContext ctx) throws Exception {
       log.info("客户端建立了连接");
       super.channelActive(ctx);
  }

   @Override
   protected void channelRead0(ChannelHandlerContext channelHandlerContext, ServiceRequestPacket serviceRequest) throws Exception {
       ServiceResponsePacket responsePacket = new ServiceResponsePacket();

       //设置响应id
       responsePacket.setRequestId(serviceRequest.getId());

      ...

       if (serviceConfig == null) {
           log.info("No such service : " + serviceRequest);
           responsePacket.setCode(1);
           responsePacket.setMessage("No such service");
      } else {
           //获取服务的实现类
           Object object = context.getBean(serviceConfig.getRef());
           //找到请求的方法
           Method method = object.getClass().getMethod(serviceRequest.getMethodName(), serviceRequest.getParameterTypes());
           //通过反射调用请求的方法,得到结果
           Object result = serviceInvoker.invoke(object, method, serviceRequest);
           log.info("service id : " + serviceRequest.getId());
           log.info("Service invoked : " + serviceRequest);
           //将请求代码(0:成功, 1:失败)和调用结果封装在响应包中
           responsePacket.setCode(0);
           responsePacket.setData(result);
      }
//将响应包通过Netty发送给客户端
       channelHandlerContext.channel().writeAndFlush(responsePacket);

  }

   @Override
   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
       log.info("客户端断开了连接");
       super.channelInactive(ctx);
  }
}

客户端发来的请求主要包括:

  • 请求id
  • 要请求的服务(即哪个类的哪个方法)
  • 请求参数(要传入该方法的参数)

通过对代码分析可以梳理出处理的流程:

  • 根据请求内容找到相应类的相应方法
  • 通过反射调用该方法,并传入请求中的参数
  • 得到结果,封装入响应包
  • 将响应发送给客户端

2. 客户端

客户端主要的工作是连接服务器、发送消息、等待服务端的消息响应以及该响应消息、关闭与服务端的连接。

一个 Netty 的客户端同样有两个部分:

  • 配置服务以及服务启动逻辑类,比如下方的 NettyClient 类。
  • 实现从客户端接收到的消息的处理逻辑类:比如下方的 ClientHandler 类。

完整代码见:https://github.com/wdw87/wRpc

NettyClient

代码较长,省略了非关键部分

代码语言:javascript
复制
public class NettyClient {

  ...

   public NettyClient(String host, int port) {

       this.host = host;
       this.port = port;
//启动类
       bootstrap = new Bootstrap();
       NioEventLoopGroup workerGroup = new NioEventLoopGroup();

       bootstrap.group(workerGroup)
              .channel(NioSocketChannel.class)
              .option(ChannelOption.SO_KEEPALIVE, true)
              .handler(new ChannelInitializer<SocketChannel>() {
                   @Override
                   protected void initChannel(SocketChannel socketChannel) throws Exception {

                       //拆包Handler
                       socketChannel.pipeline().addLast(new Spliter());
                       //解码Handler
                       socketChannel.pipeline().addLast(new Decoder());
                       //编码Handler
                       socketChannel.pipeline().addLast(new Encoder());

                      ...

                       //处理服务端响应的Handler
                       socketChannel.pipeline().addLast(serviceResponseHandler);

                  }
              });
       try {
           this.connect(host, port);
      } catch (InterruptedException e) {
           e.printStackTrace();
      }
  }
//连接服务端
   public void connect(String host, int port) throws InterruptedException {

      ...

       this.channel = channelFuture.sync().channel();
  }
//发送请求
   public Object send(ServiceRequestPacket requestPacket) throws InterruptedException {
       if(channel != null && channel.isActive()){
           //发送请求
           SynchronousQueue<Object> queue = serviceResponseHandler.sendRequest(requestPacket, channel);
           //阻塞等待响应包
           ServiceResponsePacket result = (ServiceResponsePacket)queue.take();
           //得到响应包中的请求结果
           Class<?> returnType = requestPacket.getReturnType();
           Object newdata = parseReturnType(returnType, result.getData());
           result.setData(newdata);
           return result;
      }else {
           ServiceResponsePacket responsePacket = new ServiceResponsePacket();
           responsePacket.setCode(1);
           responsePacket.setMessage("未正确连接到服务器.请检查相关配置信息!");
           return responsePacket;
      }

  }

  ...

}

NettyClient类虽然代码较长,但是结构十分简单,客户端在构造函数中初始化,与服务端一样,也有拆包和编解码过程,核心Handler是处理服务端响应的serviceResponseHandler。

值得注意的是,在send() 方法中,首先调用

serviceResponseHandler.sendRequest()方法,该方法会发出请求,同时将一个SynchronousQueue以请求id为key,放入一个ConcurrentHashMap中;

在客户端收到响应后,同样以请求id为key,得到这个SynchronousQueue,并放入响应包,这样在响应传回时就可以获得响应的响应包了。

ServiceResponseHandler
代码语言:javascript
复制
public class ServiceResponseHandler extends SimpleChannelInboundHandler<ServiceResponsePacket> {

   private Map<String, SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();

  ...

   @Override
   protected void channelRead0(ChannelHandlerContext channelHandlerContext, ServiceResponsePacket serviceResponsePacket) throws Exception {
       //得到请求id
       String id = serviceResponsePacket.getRequestId();
       //根据id得到相应的SynchronousQueue
       SynchronousQueue<Object> queue = queueMap.get(id);
       if(queue != null){
           //将响应包放入SynchronousQueue,之后,前文所述的send()方法将解除阻塞,并得响应
           queue.put(serviceResponsePacket);
           queueMap.remove(id);
      }else{
           log.error("request id error !!!");
      }
  }

   public SynchronousQueue<Object> sendRequest(ServiceRequestPacket requestPacket, Channel channel){
       SynchronousQueue<Object> queue = new SynchronousQueue<>();
       //以请求id为key,放入一个SynchronousQueue,此时SynchronousQueue为空队列
       queueMap.put(requestPacket.getId(), queue);
       //发出请求
       channel.writeAndFlush(requestPacket);
       return queue;
  }
}

与server中的Handler相似,在重写的channelRead0()方法中处理响应。

完整代码见:https://github.com/wdw87/wRpc

作者:好吃懒做贪玩东

编辑:西瓜媛

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-04-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序媛驿站 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 四、实现远程通信
    • 1. 服务端
      • NettyServer
      • ServiceRequestHandler
    • 2. 客户端
      • NettyClient
      • ServiceResponseHandler
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档