专栏首页后端技术学习Netty实现简单RPC调用

Netty实现简单RPC调用

我们知道Dubbo是一个RPC框架,那RPC框架需要实现什么?需要实现的是调用远程服务和本地服务一样方便,同时提高调用远程服务的性能。而服务端和客户端之间的关系,其实就是一个生产和消费的关系。

客户端与服务端交互关系图

1.服务消费方以本地调用方式调用服务
2.client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
3.client stub将消息进行编码并发送到服务端
4.server stub 根据解码结果调用本地的服务
5.server stub将返回导入结果进行编码并发送至消费方
6.本地服务执行并将结果返回给server stub
7.server stub将返回导入结果进行编码并发送至消费方
8.client stub接收到消息并进行解码
9.服务消费方(client)得到结果

RPC的目标是将2-8步骤进行封装,用户无需关系这些细节,也即实现远程调用和调用本地方法一样。

Server服务提供方

server
/**
 * 服务提供方
 * @author Administrator
 *
 */
public interface HelloNetty {
  String hello();
}

/**
 * 实现HelloNetty接口
 * @author Administrator
 *
 */
public class HelloNettyImpl implements HelloNetty {

    @Override
    public String hello() {
        return "hello,netty";
    }
}

/**
 * 服务提供方
 * @author Administrator
 *
 */
public interface HelloRPC {
    String hello(String name);
}

/**
 * HelloRPC接口的实现
 * @author Administrator
 *
 */
public class HelloRPCImpl implements HelloRPC {

    @Override
    public String hello(String name) {
        return "hello,"+name;
    }
}
Server Stub

封装需要传递的消息实体类

/**
 * 封装类信息,实体类用来封装消费方发起远程调用时传给服务方的数据
 * @author Administrator
 *
 */
//封装类信息
public class ClassInfo implements Serializable {

  private static final long serialVersionUID = 1L;

  private String className;  //类名
  private String methodName;//方法名
  private Class<?>[] types; //参数类型
  private Object[] objects;//参数列表

  public String getClassName() {
      return className;
  }

  public void setClassName(String className) {
      this.className = className;
  }

  public String getMethodName() {
      return methodName;
  }

  public void setMethodName(String methodName) {
      this.methodName = methodName;
  }

  public Class<?>[] getTypes() {
      return types;
  }

  public void setTypes(Class<?>[] types) {
      this.types = types;
  }

  public Object[] getObjects() {
      return objects;
  }

  public void setObjects(Object[] objects) {
      this.objects = objects;
  }
}
服务器端业务处理
/**
 * 服务器端业务处理类
 * @author Administrator
 *
 */
public class InvokeHandler extends ChannelInboundHandlerAdapter{
    //得到某接口下某个实现类的名字
    private String getImplClassName(ClassInfo classInfo) throws Exception{
        //服务方接口和实现类所在的包路径
        String interfacePath="com.study.nettyRpc.server";
        int lastDot = classInfo.getClassName().lastIndexOf(".");
        String interfaceName=classInfo.getClassName().substring(lastDot);
        Class superClass=Class.forName(interfacePath+interfaceName);
        Reflections reflections = new Reflections(interfacePath);
        //得到某接口下的所有实现类
        Set<Class> ImplClassSet=reflections.getSubTypesOf(superClass);
        if(ImplClassSet.size()==0){
            System.out.println("未找到实现类");
            return null;
        }else if(ImplClassSet.size()>1){
            System.out.println("找到多个实现类,未明确使用哪一个");
            return null;
        }else {
            //把集合转换为数组
            Class[] classes=ImplClassSet.toArray(new Class[0]);
            return classes[0].getName(); //得到实现类的名字
        }
    }

    @Override  //读取客户端发来的数据并通过反射调用实现类的方法
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ClassInfo classInfo = (ClassInfo) msg;
        Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
        Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
        //通过反射调用实现类的方法
        Object result = method.invoke(clazz, classInfo.getObjects());
        ctx.writeAndFlush(result);
    }
}
网络处理服务器Server端
/**
 * 网络处理服务器
 * @author Administrator
 *
 */
public class NettyRPCServer {
  private int port;
  public NettyRPCServer(int port) {
      this.port = port;
  }

  public void start() {
      EventLoopGroup bossGroup = new NioEventLoopGroup();
      EventLoopGroup workerGroup = new NioEventLoopGroup();
      try {
          ServerBootstrap serverBootstrap = new ServerBootstrap();
          serverBootstrap.group(bossGroup, workerGroup)
                  .channel(NioServerSocketChannel.class)
                  .option(ChannelOption.SO_BACKLOG, 128)
                  .childOption(ChannelOption.SO_KEEPALIVE, true)
                  .localAddress(port).childHandler(
                          new ChannelInitializer<SocketChannel>() {
                              @Override
                              protected void initChannel(SocketChannel ch) throws Exception {
                                  ChannelPipeline pipeline = ch.pipeline();
                                  //编码器
                                  pipeline.addLast("encoder", new ObjectEncoder());
                                  //解码器
                                  pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                  //服务器端业务处理类
                                  pipeline.addLast(new InvokeHandler());
                              }
                          });
          ChannelFuture future = serverBootstrap.bind(port).sync();
          System.out.println("......server is ready......");
          future.channel().closeFuture().sync();
      } catch (Exception e) {
          //优雅关闭
          bossGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
      }
  }

  public static void main(String[] args) throws Exception {
      new NettyRPCServer(9999).start();
  }
}

Client客户端

client stub

客户端代理类
/**
 * 客户端代理类
 * @author Administrator
 *
 */
public class NettyRPCProxy {
  //根据接口创建代理对象
  public static Object create(Class target){
      return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler(){

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            //封装ClassInfo
            ClassInfo classInfo = new ClassInfo();
            classInfo.setClassName(target.getName());
            classInfo.setMethodName(method.getName());
            classInfo.setObjects(args);
            classInfo.setTypes(method.getParameterTypes());

            //开始用Netty发送数据
            EventLoopGroup group = new NioEventLoopGroup();
            ResultHandler resultHandler = new ResultHandler();
            try{
                Bootstrap b = new Bootstrap();
                b.group(group)
                 .channel(NioSocketChannel.class)
                 .handler(new ChannelInitializer<SocketChannel>(){

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //编码器
                        pipeline.addLast("encoder",new ObjectEncoder());
                        //解码器
                        pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));

                        //客户端业务处理类
                        pipeline.addLast("handler",resultHandler);
                    }

                 });
                ChannelFuture future = b.connect("127.0.0.1",9999).sync();
                future.channel().writeAndFlush(classInfo).sync();
                future.channel().closeFuture().sync();
            }finally{
                group.shutdownGracefully();
            }
            return resultHandler.getResponse();
        }             
      });
  }
}
客户端业务处理类
/**
 * 客户端业务处理类
 * @author Administrator
 *
 */
public class ResultHandler extends ChannelInboundHandlerAdapter{

    private Object response;
    public Object getResponse(){
        return response;
    }

    //读取服务器端返回的数据(远程调用的结果)
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        response = msg;
        ctx.close();
    }
}
客户端调用
/**
 * 服务调用方
 * 
 * @author Administrator
 *
 */
public class TestNettyRPC {
    public static void main(String[] args) {

        //第1次远程调用
        HelloNetty helloNetty=(HelloNetty) NettyRPCProxy.create(HelloNetty.class);
        System.out.println(helloNetty.hello());

        //第2次远程调用
        HelloRPC helloRPC =  (HelloRPC) NettyRPCProxy.create(HelloRPC.class);
        System.out.println(helloRPC.hello("RPC"));

    }
}

启动之后运行结果

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-04-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • rocketmq学习2

    从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfi...

    路行的亚洲
  • java8学习整理

    由于项目中使用java8中的lambda表达式,因此想学习一下java8中的lambda表达式和stream流。

    路行的亚洲
  • CompletableFuture学习

    前面我们已经知道CompletionService是可以解决Future带来的阻塞问题的,同时我们除了前面我们看到的take方法之外,还可以使用poll方法,这...

    路行的亚洲
  • 类的继承,文件读写

    设计思想:用随机文件流把文件正向读出并保存到了字符串中,将字符串倒序,显示到控制台。

    达达前端
  • 2019Java面试代码与编程题

    代码与编程题 135、写一个Singleton出来 Singleton模式主要作用是保证在Java应用程序中,一个类Class只有一个实例存在。 一般Si...

    葆宁
  • Java中的语法糖

    语法糖方便了程序员的开发,提高了开发效率,提升了语法的严谨也减少了编码出错误的几率。我们不仅仅在平时的编码中依赖语法糖,更要看清语法糖背后程序代码的真实结构,这...

    哲洛不闹
  • 一个简单的支持MySQL和SQLite3的DB接口

    simple_db.zip 相关联代码:https://github.com/eyjian/mooon/tree/master/common_library/...

    一见
  • RxJava1 升级到 RxJava2 所踩过的坑

    RxJava2 发布已经有一段时间了,是对 RxJava 的一次重大的升级,由于我的一个库cv4j使用了 RxJava2 来尝鲜,但是 RxJava2 跟 Rx...

    fengzhizi715
  • 用户尝试登陆错误次数

    由于我们将使用shiro + ehache配合使用,所以可以不用单独再引用ehcache.jar了,使用shiro-ehcache时,会自动添加ehcache-...

    微醺
  • 从0.5到1写个rpc框架 - 5:服务监控和管理(actuator)

    springboot项目中只要引入spring-boot-starter-actuator就可以得到一些管理服务的接口,比如停止服务,获取服务信息等。他用的并不...

    acupt

扫码关注云+社区

领取腾讯云代金券