前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >花了一星期,自己写了个简单的RPC框架

花了一星期,自己写了个简单的RPC框架

作者头像
CBeann
发布2023-12-25 19:57:55
1660
发布2023-12-25 19:57:55
举报
文章被收录于专栏:CBeann的博客CBeann的博客

想法的来源

学了netty框架以及看了一下一小部分的netty框架的源码,听说dubbo是基于netty框架的一个优秀的落地实现,所以看了一小部分dubbo的源码,感觉学习netty总要有一个方式证明自己曾经学过,所以写下这一篇小笔记,写给自己看。

源码下载

https://github.com/cbeann/NettyRpcDemooo

前提

Zookeeper知识点

PERSISTENT

持久化节点

PERSISTENT_SEQUENTIAL

持久化排序节点

EPHEMERAL

临时节点

EPHEMERAL_SEQUENTIAL

临时排序节点

Netty知识点

https://blog.csdn.net/qq_37171353/category_9328166.html

其它参考百度。。。

Spring的生命周期

Bean的生命周期_CBeann的博客-CSDN博客

其它参考百度。。。

自定义starter

自定义redis-spring-boot-starter_CBeann的博客-CSDN博客_redis starter

其它参考百度。。。

动手实践

注意

下面只贴了部分代码和写这个小demo的想法,全部代码在 https://github.com/cbeann/NettyRpcDemooo

目标

1)zk做为注册中心

2)服务提供者和消费者支持集群和轮询调用

3)服务提供者上线和下线服务消费者可以感知

4)整合自定义starter

5)没有使用序列化,使用json字符串进行接口调用

RPC程序需要的配置参数

服务提供者

注册中心ZK的IP

注册中心ZK的端口

服务提供者的名称

服务提供者的端口(NettyServer的端口,不是SpringBoot的端口)

服务消费者

注册中心ZK的IP

注册中心ZK的端口

服务消费者的名称

zk中服务提供者的存储目录结构

如果要满足集群版本的服务提供者存储,zk的存储设计也应该好好的想一想,有一点可以明确的创建的节点有临时节点,这样服务消费者才能通过某种机制监听到服务提供者上下线并进行业务逻辑操作。

本文中的存储结构如图所示

  • /rpc为项目根目录(持久化节点)
  • /rpc/provider下存储的是服务提供者集群(持久化节点)
  • /rpc/provider/XXX存储的是服务提供者名称(持久化节点)
  • /rpc/provider/XXX/IP:端口存储的提供者名称为XXX的实例(临时节点)

服务提供者思路

服务提供者的思路其实比服务消费者简单多了,其实只需要解决把ioc容器放入到自定义的SimpleChannelInboundHandler中,然后读取json字符串获取class、方法和参数,然后在ioc容器中获取类并反射调用方法返回结果即可。

在配置文件中拿到服务提供者NettyServer中ServerBootstrap的端口,然后启动ServerBootstrap,并向zk暴露自己的服务(添加临时节点信息),服务提供者就完成了。

  • 1)在配置文件中拿到ServerBootstrap的端口,调用下面的构造器
代码语言:javascript
复制
public NettyServer(int port) {
    this.port = port;
  }
  • 2)通过ApplicationContextAware,保存ApplicationContext在ServerHander中
代码语言:javascript
复制
public class NettyServer implements ApplicationContextAware{

private ApplicationContext applicationContext;

@Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
  }
}
/*
 socketChannel.pipeline().addLast(new RcpServerHandler(applicationContext));
*/
  • 3)最后通过Bean的生命周期,创建Bean完毕后调用@PostConstruct的方法
代码语言:javascript
复制
/** 启动Netty服务器 */
  @PostConstruct
  public void postConstruct() {
    // 向外暴露端口,zk添加服务信息
    doExport();
    // 异步开启netty服务器
    new Thread(
            () -> {
              this.bind();//ServerBootstrap.bind(ip)
            })
        .start();
  }
  • 4)此时Netty服务器已经启动并且保存ApplicationContext在ServerHander中

部分代码如下所示:

代码语言:javascript
复制
package com.rpc.server;

import com.rpc.properties.RpcProperties;
import com.rpc.properties.RpcServerProperties;
import com.rpc.zk.ZKServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import javax.annotation.PostConstruct;
import java.net.InetAddress;

/**
 * @author chaird
 * @create 2021-02-07 2:32
 */
public class NettyServer implements ApplicationContextAware {

  private Integer port;

  private ApplicationContext applicationContext;

  EventLoopGroup bossGroup = new NioEventLoopGroup();
  EventLoopGroup workerGroup = new NioEventLoopGroup();

  ServerBootstrap b = new ServerBootstrap();

  public NettyServer(int port) {
    this.port = port;
  }

  /** * 开启NettyServer的方法 */
  public void bind() {
    try {
      b.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 1024)
          .childHandler(
              new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                  // TimeClientHandler是自己定义的方法
                  socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                  socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                  socketChannel.pipeline().addLast(new RcpServerHandler(applicationContext));
                }
              });
      System.out.println("服务端    Netty服务器启动成功:" + port);
      // 绑定端口
      ChannelFuture f = b.bind(port).sync();
      // 等待服务端监听端口关闭
      f.channel().closeFuture().sync();
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      // 优雅关闭,释放线程池资源
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  /** 启动Netty服务器 */
  @PostConstruct
  public void postConstruct() {
    // 向外暴露端口
    doExport();
    // 异步开启netty服务器
    new Thread(
            () -> {
              this.bind();
            })
        .start();
  }

  /** 服务暴露(其实就是把服务信息保存到Zookeeper上) */
  private void doExport() {
    ZKServer zkServer = applicationContext.getBean(ZKServer.class);
    RpcServerProperties rpcServerProperties = applicationContext.getBean(RpcServerProperties.class);
    RpcProperties rpcProperties = applicationContext.getBean(RpcProperties.class);

    //   providerGroupDir = /rpc/provider/myProviderName
    String providerGroupDir = rpcProperties.getPath() + rpcProperties.getProviderPath();
    providerGroupDir = providerGroupDir + "/" + rpcServerProperties.getProviderName();

    try {
      // 创建服务名目录(用于集群)
      zkServer.createPathPermanent(providerGroupDir, "");
    } catch (Exception e) {
      e.printStackTrace();
    }

    try {
      String providerAddress = InetAddress.getLocalHost().getHostAddress();
      String providerInstance = providerAddress + ":" + rpcServerProperties.getProviderPort();
      // key(path) = /rpc/provider/myProviderName/127.0.0.1:8080   value:127.0.0.1:8080
      zkServer.createPathTemp(providerGroupDir + "/" + providerInstance, providerInstance);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
  }
}

服务消费者思路和难点

服务消费者则从构思遇到的难点进行拆解,服务消费者比服务提供者难多了!!!

服务消费者中怎么存储与多个服务提供者的关系?

客户端比如果你看过部分Netty的源码,那你比较容易下面的设计思路。没看过也没关系,反正我写的也是个demo,也很容易理解。

  • 首先从细粒度开始,客户端要和每一个服务提供者建立连接,即和每一个ServerBootstrap建立连接,所以会有如下的自定义实体类(ip,port)
代码语言:javascript
复制
//Netty连接服务提供者的自定义客户端
public class NettyClient {

  public NettyClient(String ip, Integer port) {
    this.ip = ip;
    this.port = port;
  }

  private String ip;
  private Integer port;

  EventLoopGroup group = new NioEventLoopGroup();
  Bootstrap b = new Bootstrap();

  //...
  
}
  • 如果我们设计的是一个服务提供者,那上面就没有问题了,如果要实现服务提供者的集群方案,那么就需要在上面的基础上再包装一层,定义为服务提供者组(集群),如下所示。
代码语言:javascript
复制
//服务提供者组(集群)
public class NettyClientGroup {

  /** 下一个下标 */
  private AtomicInteger index = new AtomicInteger(0);

  /** 服务提供者名称 */
  private String providerName;

  /** 服务提供者列表 */
  List<NettyClient> providerList = new ArrayList<>();
  /** key:服务提供者ip:端口    value:NettyClient */
  Map<String, NettyClient> providerMap = new HashMap<>();
}
  • 服务提供者的集群实现了,我可以从NettyClientGroup中选择NettyClient,但是服务提供者不是一种类型的,比如有订单服务提供者、用户服务提供者等,所以外面还要包装一层,NettyClientBootStarp,用于保存服务提供者组名称和服务提供者组的关系,如下所示
代码语言:javascript
复制
//服务消费者启动器
public class NettyClientBootStarp implements ApplicationContextAware {

  /**key:服务提供者组名称,服务提供者组*/
  Map<String, NettyClientGroup> providers = new HashMap<>();
  List<NettyClient> providerList = new ArrayList<>();

  public NettyClientBootStarp() {}

}
  • 此时,存储关系已经很明显了,下面用实例展示
服务消费者中怎么感知服务提供者上下线以及感知到如何处理?
  • 首先要监听到zk服务提供者节点的上下和下线,如下所示,其实我们是可以监听到的
代码语言:javascript
复制
//注册默认的watcher
zk = new ZooKeeper(url, 5000, watcher);
String listenProviderPath = path + providerPath;
// 给某节点添加watcher
zk.getChildren(listenProviderPath, true);
  • 监听到需要做什么业务逻辑呢?

就是把某个上线或者下线的服务提供者从NettyClientBootStarp 中移除,但是此处我用了一个比较极端的操作,即重新链接所有的服务提供者,这样就可以添加或者删除掉变动的提供者

代码语言:javascript
复制
public class RpcServiceChangeWatcher implements Watcher, ApplicationContextAware {


  private RpcProperties rpcProperties;

  private NettyClientBootStarp nettyClientBootStarp;


  @Override
  public void process(WatchedEvent event) {
    System.out.println(event);

    // 实际业务
    try {
      nettyClientBootStarp.refreshProviders();
    } catch (Exception e) {
      e.printStackTrace();
    }

    // 重新监听
    String providersPath = rpcProperties.getPath() + rpcProperties.getProviderPath();
    try {
      zkServer.getZk().getChildren(providersPath, true);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

}
怎么实现服务消费者调用正常的controller就能通过NettyClient发送数据到NettyServer?

如下所示,我在SpringBoot的客户端中的HelloController中放入StudentService(接口,非实现类),怎么就能发送数据呢????

代码语言:javascript
复制
@RestController
public class HelloController {

  @RpcService("provider01") // 自定义注解,其中value为服务提供者名称,类似OpenFeign的使用
  private StudentService studentService;


  @GetMapping("/index/{id}")
  public Object hello(@PathVariable Integer id) {
    String res = studentService.getId(id);
    return res;
  }

答案:代理、反射、自定义注解

  • 首先自定义注解,其中value为服务提供者名称
代码语言:javascript
复制
/**
 * 该注解用于注入远程服务
 */
@Target(ElementType.FIELD) // 方法注解
@Retention(RetentionPolicy.RUNTIME) // 运行时注解
public @interface RpcService {

  String value();//服务提供者名称
}
  • 然后自定义BeanPostProcessor(Bean的生命周期知识点),目的是包装Bean,大体的逻辑为判断类里是否有自定义注解@RpcService,并获取注解的value,即需要调用的远程服务的名称;遍历获取标注该注解的属性,生成包含属性为(NettyClientBootStarp )的代理对象,然后注入到对象的该属性中,这样从逻辑上解决了 空指针异常
代码语言:javascript
复制
public class RcpServiceInjectBeanPostProcessor
    implements InstantiationAwareBeanPostProcessor, ApplicationContextAware {

  private ApplicationContext context;

  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName)
      throws BeansException {

    // 判断类里是否有@RpcService注解

    Class<?> clazz = context.getType(beanName);
    if (Objects.isNull(clazz)) {
      return bean;
    }
    Field[] declaredFields = clazz.getDeclaredFields();
    for (Field field : declaredFields) {
      // 找出标记了InjectService注解的属性
      RpcService injectService = field.getAnnotation(RpcService.class);
      if (injectService == null) {
        continue;
      }

      // 获取服务名称
      String providerName = injectService.value();
      // 获取接口Class
      Class<?> fieldClass = field.getType();
      // 获取nettyClient
      NettyClientBootStarp nettyClientBootStarp = context.getBean(NettyClientBootStarp.class);

      RpcFactoryProxy rpcFactoryProxy =
          new RpcFactoryProxy(fieldClass, providerName, nettyClientBootStarp);
      Object proxy = rpcFactoryProxy.getProxy();

      Object object = bean;
      field.setAccessible(true);
      try {
        // 请开始你的表演
        field.set(object, proxy);
      } catch (IllegalAccessException e) {
        e.printStackTrace();
      }
    }

    return bean;
  }

  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    return bean;
  }

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.context = applicationContext;
  }

  public RcpServiceInjectBeanPostProcessor() {
    System.out.println("-----RcpServiceInjectBeanPostProcessor-----------");
  }
}

通过反射发送数据。

(1)invoke方法可以获取到调用的接口名称、方法名称和参数

(2)代理对象构造的时候传入了需要调用的服务名称和NettyClientBootStarp

(3)在invoke中把接口名称、方法名称和参数通过Netty发送给服务提供者是可以实现的

代码语言:javascript
复制
public class RpcFactoryProxy<T> implements InvocationHandler {

  private Class<T> proxyInterface;
  // 这里可以维护一个缓存,存这个接口的方法抽象的对象

  private NettyClientBootStarp nettyClientBootStarp;

  private String serviceName;

  public RpcFactoryProxy(
      Class<T> proxyInterface, String serviceName, NettyClientBootStarp nettyClient) {
    this.serviceName = serviceName;
    this.proxyInterface = proxyInterface;
    this.nettyClientBootStarp = nettyClient;
  }

  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

    System.out.println("invoke");

    Map<String, NettyClientGroup> providers = nettyClientBootStarp.getProviders();

    NettyClientGroup nettyClientGroup = providers.get(serviceName);

    if (null == nettyClientGroup) {
      RpcResponse response = RpcResponse.NO_SERVICE();
      return response.getReturnValue();
    }

    NettyClient nettyClient = nettyClientGroup.next();

    if (null == nettyClient) {
      RpcResponse response = RpcResponse.NO_SERVICE();
      return response.getReturnValue();
    }

    RpcRequest rpcRequest = new RpcRequest();
    rpcRequest.setRequestId(UUID.randomUUID().toString().substring(0, 8));
    // 设置服务名称
    rpcRequest.setServiceName(serviceName);
    // 设置是哪个类
    rpcRequest.setClazzName(proxyInterface.getName());
    // 设置哪个方法
    rpcRequest.setMethodName(method.getName());
    // 设置参数类型
    Class<?>[] parameterTypes = method.getParameterTypes();
    String[] parameterTypeString = Class2String.class2String(parameterTypes);
    rpcRequest.setParameterTypeStrings(parameterTypeString);

    // 设置参数
    rpcRequest.setParameters(args);

    // 发送消息
    RpcResponse response = nettyClient.sendMessage(rpcRequest);

    if (response == null) {
      response = RpcResponse.TIME_OUT(rpcRequest.getRequestId());
    }

    return response.getReturnValue();
  }

  public T getProxy() {
    return (T)
        Proxy.newProxyInstance(proxyInterface.getClassLoader(), new Class[] {proxyInterface}, this);
  }
}
Netty客户端的SimpleChannelInboundHandler都是打印数据,我怎么实现要调用后返回数据而不是打印数据呢?

首先自定义实现Future的实现类RpcFuture,注意这里面有一个CountDownLatch(1)

代码语言:javascript
复制
public class RpcFuture<T> implements Future<T> {

    private T response;
    /**
     * 因为请求和响应是一一对应的,所以这里是1
     */
    private CountDownLatch countDownLatch = new CountDownLatch(1);
  

    /**
     * 获取响应,直到有结果才返回
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    @Override
    public T get() throws InterruptedException, ExecutionException {
        countDownLatch.await();
        return response;
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (countDownLatch.await(timeout,unit)){
            return response;
        }
        return null;
    }

    public void setResponse(T response) {
        this.response = response;
        countDownLatch.countDown();
    }

    
}

当发送数据的时候创建一个RcpFuture,然后把该RcpFuture存在一个地方FuturePool,此时调用RcpFuture.get方法是阻塞的(阻塞原因CountDownLatch ),如下所示

代码语言:javascript
复制
public RpcResponse sendMessage(RpcRequest msg) {
    // 存起来
    RpcFuture<RpcResponse> future = new RpcFuture<>();
    FuturePool.put(msg.getRequestId(), future);

    RpcResponse rpcResponse = null;
    try {
      String s = JSONUtil.toJsonStr(msg);
      f.channel().writeAndFlush(s);

      rpcResponse = future.get(2000, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      FuturePool.remove(msg.getRequestId());
    }

    return rpcResponse;
  }

那么在打印的地方通过key获取RpcFuture ,然后把结果通过RpcFuture.setResponse方法设计进去,如下所示,上面就能返回结果了。

代码语言:javascript
复制
public class RpcClientHandler extends SimpleChannelInboundHandler<String> {
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

    String resp = msg;
    System.out.println("resp:"+resp);

    RpcResponse rpcResponse = JSONUtil.toBean(resp, RpcResponse.class);

    RpcFuture future = FuturePool.get(rpcResponse.getRequestId());


    future.setResponse(rpcResponse);

  }

总结

1)一个小案例的实现要整合多个知识点,并且写了这篇晦涩难懂的文章,还是花了点时间的。

2)dubbo中使用的FactoryBean实现的远程调用,可以看看人家的思路。

3)学习到了很多东西,比如我现在才知道zk的监听机制只监听一次。

4)需要用到的知识点太多了,自定义注解,BeanPostProcessor,反射,Future等,是个学习Netty不错的案例。

5)新的一年,希望大家步步高升,一年比一年好。

参考

https://github.com/2YSP/rpc-spring-boot-starter#rpc-spring-boot-starter

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-06-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 想法的来源
  • 源码下载
  • 前提
    • Zookeeper知识点
      • Netty知识点
        • Spring的生命周期
          • 自定义starter
          • 动手实践
            • 注意
              • 目标
                • RPC程序需要的配置参数
                  • 服务提供者
                  • 服务消费者
                • zk中服务提供者的存储目录结构
                  • 服务提供者思路
                    • 服务消费者思路和难点
                      • 服务消费者中怎么存储与多个服务提供者的关系?
                      • 服务消费者中怎么感知服务提供者上下线以及感知到如何处理?
                      • 怎么实现服务消费者调用正常的controller就能通过NettyClient发送数据到NettyServer?
                      • Netty客户端的SimpleChannelInboundHandler都是打印数据,我怎么实现要调用后返回数据而不是打印数据呢?
                  • 总结
                  • 参考
                  相关产品与服务
                  对象存储
                  对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档