造个轮子之基于 Netty 实现自己的 RPC 框架

服务端开发都会或多或少的涉及到 RPC 的使用,当然如果止步于会用,对自己的成长很是不利,所以楼主今天本着知其然,且知其所以然的精神来探讨一下 RPC 这个东西。

child-rpc模型

child-rpc 采用 socket 直连的方式来实现服务的远程调用,然后使用 jdk 动态代理的方式让调用者感知不到远程调用。

child-rpc 开箱使用

发布服务

RPC 服务类要监听指定IP端口,设置要发布的服务的实现及其接口的引用,并指定序列化的方式,目前 child-rpc 支持 Hessian,JACKSON 两种序列化方式。

/**
 * @author wuhf
 * @Date 2018/9/1 18:30
 **/
public class ServerTest {

    public static void main(String[] args) {
        ServerConfig serverConfig = new ServerConfig();
        serverConfig.setSerializer(Serializer.SerializeEnum.HESSIAN.serializer)
                .setPort(5201)
                .setInterfaceId(HelloService.class.getName())
                .setRef(HelloServiceImpl.class.getName());
        ServerProxy serverProxy = new ServerProxy(new NettyServer(),serverConfig);
        try {
            serverProxy.export();
            while (true){

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

引用服务

RPC 客户端要链接远程 IP 端口,并注册要引用的服务,然后调用 sayHi 方法,输出结果

/**
 * @author wuhf
 * @Date 2018/9/1 18:31
 **/
public class ClientTest {

    public static void main(String[] args) {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setHost("127.0.0.1")
                .setPort(5201)
                .setTimeoutMillis(100000)
                .setSerializer(Serializer.SerializeEnum.HESSIAN.serializer);
        ClientProxy clientProxy = new ClientProxy(clientConfig,new NettyClient(),HelloService.class);
        for (int i = 0; i < 10; i++) {
            HelloService helloService = (HelloService) clientProxy.refer();
            System.out.println(helloService.sayHi());
        }
    }
}

运行

server 端输出

client 端输出

child-rpc 具体实现

RPC 请求,响应消息实体定义

定义消息请求响应格式,消息类型、消息唯一 ID 和消息的 json 序列化字符串内容。消息唯一 ID 是用来客户端验证服务器请求和响应是否匹配。

// rpc 请求
public class RpcRequest implements Serializable {
    private static final long serialVersionUID = -4364536436151723421L;

    private String requestId;
    private long createMillisTime;
    private String className;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object[] parameters;

    // set get 方法省略掉
}
// rpc 响应
public class RpcResponse implements Serializable {
    private static final long serialVersionUID = 7329530374415722876L;

    private String requestId;
    private Throwable error;
    private Object result;
    // set get 方法省略掉
}

网络传输过程中的编码解码

消息编码解码使用自定义的编解码器,根据服务初始化是使用的序列化器来将数据序列化成字节流,拆包的策略是设定指定长度的数据包,对 socket 粘包,拆包感兴趣的小伙伴请移步 Socket 中粘包问题浅析及其解决方案

下面是解码器代码实现 :

public class NettyDecoder extends ByteToMessageDecoder {

    private Class<?> genericClass;
    private Serializer serializer;

    public NettyDecoder(Class<?> genericClass, Serializer serializer) {
        this.genericClass = genericClass;
        this.serializer = serializer;
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if (byteBuf.readableBytes() < 4) {
            return;
        }

        byteBuf.markReaderIndex();
        // 读取消息长度
        int dataLength = byteBuf.readInt();
        
        if (dataLength < 0) {
            channelHandlerContext.close();
        }

        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();
            return;
        }

        try {
            byte[] data = new byte[dataLength];
            byteBuf.readBytes(data);
            Object object = serializer.deserialize(data,genericClass);
            list.add(object);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

下面是编码器的实现:

public class NettyEncoder extends MessageToByteEncoder<Object> {

    private Class<?> genericClass;
    private Serializer serializer;

    public NettyEncoder(Class<?> genericClass,Serializer serializer) {
        this.serializer = serializer;
        this.genericClass = genericClass;
    }

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object object, ByteBuf byteBuf) throws Exception {
        if (genericClass.isInstance(object)) {
            byte[] data = serializer.serialize(object);
            byteBuf.writeInt(data.length);
            byteBuf.writeBytes(data);
        }
    }
}

RPC 业务逻辑处理 handler

server 端业务处理 handler 实现 : 主要业务逻辑是 通过 java 的反射实现方法的调用。

public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {
        // invoke 通过调用反射方法获取 rpcResponse
        RpcResponse response = RpcInvokerHandler.invokeService(rpcRequest);
        channelHandlerContext.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error(">>>>>>>>>>> child-rpc provider netty server caught exception", cause);
        ctx.close();
    }
}

public class RpcInvokerHandler {
    public static Map<String, Object> serviceMap = new HashMap<String, Object>();
    public static RpcResponse invokeService(RpcRequest request) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Object serviceBean = serviceMap.get(request.getClassName());

        RpcResponse response = new RpcResponse();
        response.setRequestId(request.getRequestId());
        try {
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = request.getMethodName();
            Class<?>[] parameterTypes = request.getParameterTypes();
            Object[] parameters = request.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            Object result = method.invoke(serviceBean, parameters);

            response.setResult(result);
        } catch (Throwable t) {
            t.printStackTrace();
            response.setError(t);
        }
        return response;
    }
}

client 端主要业务实现是等待 server 响应返回。代码比较简单就不贴代码了,详情请看下面给出的 github 链接。

RPC 服务端与客户端启动

因为服务端与客户端启动都是 Netty 的模板代码,因为篇幅原因就不贴出来了,感兴趣的伙伴请移步 造个轮子—RPC动手实现

小结

因为只是为了理解 RPC 的本质,所以在实现细节上还有好多没有仔细去雕琢的地方。不过 RPC 的目的就是允许像调用本地服务一样调用远程服务,对调用者透明,于是我们使用了动态代理。并使用 Netty 的 handler 发送数据和响应数据,总的来说该框架实现了简单的 RPC 调用。代码比较简单,主要是思路,以及了解 RPC 底层的实现。

参考文章

作 者:haifeiWu 原文链接:http://www.hchstudio.cn/article/2018/b674/ 版权声明:非特殊声明均为本站原创作品,转载时请注明作者和原文链接。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏androidBlog

一步步拆解 LeakCanary

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdutxiaoxu/article/de...

961
来自专栏阿杜的世界

Spring实战3:装配bean的进阶知识主要内容:

在装配bean—依赖注入的本质一文中,我们探讨了Spring的三种管理bean的方式:自动装配、基于JavaConfig、基于XML文件。这篇文字将探讨一些Sp...

892
来自专栏JackieZheng

照虎画猫写自己的Spring——依赖注入

前言 上篇《照虎画猫写自己的Spring》从无到有讲述并实现了下面几点 声明配置文件,用于声明需要加载使用的类 加载配置文件,读取配置文件 解析配置文件,需要将...

2038
来自专栏hotqin888的专栏

bootstrap treeview 增删改的正确姿势

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/hotqin888/article/det...

4723
来自专栏Hongten

java开发_UUID(Universally Unique Identifier,全局唯一标识符)和GUID(Globally Unique Identifier,全球唯一标识符)

GUID: 即Globally Unique Identifier(全球唯一标识符) 也称作 UUID(Universally Unique IDentifie...

1131
来自专栏犀利豆的技术空间

徒手撸框架--实现 RPC 远程调用

微服务已经是每个互联网开发者必须掌握的一项技术。而 RPC 框架,是构成微服务最重要的组成部分之一。趁最近有时间。又看了看 dubbo 的源码。dubbo 为了...

1532
来自专栏郭霖

Android Volley完全解析(三),定制自己的Request

经过前面两篇文章的学习,我们已经掌握了Volley各种Request的使用方法,包括StringRequest、JsonRequest、ImageRequest...

2346
来自专栏大内老A

IoC+AOP的简单实现

对EnterLib有所了解的人应该知道,其中有一个名叫Policy Injection的AOP框架;而整个EnterLib完全建立在另一个叫作Unity的底层框...

2129
来自专栏Java呓语

单元测试以及JUnit框架解析

我们都有个习惯,常常不乐意去写个简单的单元测试程序来验证自己的代码。对自己的程序一直非常有自信,或存在侥幸心理每次运行通过后就直接扔给测试组测试了。然而每次测试...

1562
来自专栏Java架构师学习

带你深入了解Java线程中的那些事

引言 说到Thread大家都很熟悉,我们平常写并发代码的时候都会接触到,那么我们来看看下面这段代码是如何初始化以及执行的呢? public class Thre...

3358

扫码关注云+社区

领取腾讯云代金券