前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RPC项目记录二期 - Netty替换socket,实现网络传输,解编码器,序列化器

RPC项目记录二期 - Netty替换socket,实现网络传输,解编码器,序列化器

作者头像
宇宙无敌暴龙战士之心悦大王
发布2022-02-17 15:43:42
4640
发布2022-02-17 15:43:42
举报
文章被收录于专栏:kwaikwai

内容

使用netty替代原先的socket网络传输,使效率更高

优点

netty是nio,socket是bio。

其实就是nio比bio好在哪。

出现的问题

基础

需要实现commonEncoder,CommonDecoder,NettyClientHandler。

这里用的是责任链模式,要层层处理后才可以给下一层,这里需要实现解码器,编码器,数据处理器。

发送rpc请求

这里我们是使用channel进行发送的,因为这是非阻塞的,所以结果会直接返回,导致接受不到结果。

这里我们需要用到attributeKey,netty常用解决粘包的代码。

代码语言:javascript
复制
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
RpcResponse rpcResponse = channel.attr(key).get();

通过这种方式获得全局可见的返回结果,在获得返回结果 RpcResponse 后,将这个对象以 key 为 rpcResponse 放入 ChannelHandlerContext 中,这里就可以立刻获得结果并返回,我们会在 NettyClientHandler 中看到放入的过程。

自定义协议和解编码器

自定义协议

首先是 4 字节魔数,表识一个协议包。接着是 Package Type,标明这是一个调用请求还是调用响应,Serializer Type 标明了实际数据使用的序列化器,这个服务端和客户端应当使用统一标准;Data Length 就是实际数据的长度,设置这个字段主要防止粘包,最后就是经过序列化后的实际数据,可能是 RpcRequest 也可能是 RpcResponse 经过序列化后的字节,取决于 Package Type。

解编码器

1,编码器 commonEncoder

代码语言:javascript
复制
public class CommonEncoder extends MessageToByteEncoder {

    private static final int MAGIC_NUMBER = 0xCAFEBABE;

    private final CommonSerializer serializer;

    public CommonEncoder(CommonSerializer serializer) {
        this.serializer = serializer;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        out.writeInt(MAGIC_NUMBER);
        if(msg instanceof RpcRequest) {
            out.writeInt(PackageType.REQUEST_PACK.getCode());
        } else {
            out.writeInt(PackageType.RESPONSE_PACK.getCode());
        }
        out.writeInt(serializer.getCode());
        byte[] bytes = serializer.serialize(msg);
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
    }
}

MessageToByteEncoder:就是将数据(要发送的对象)转化成字节。

encode:按照协议,将字段一个个传入管道里。

这里要注意的:实现编码器,必须要传入一个选定的序列化容器。

2,解码器 commonDecoder

代码语言:javascript
复制
public class CommonDecoder extends ReplayingDecoder {

    private static final Logger logger = LoggerFactory.getLogger(CommonDecoder.class);
    private static final int MAGIC_NUMBER = 0xCAFEBABE;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magic = in.readInt();
        if(magic != MAGIC_NUMBER) {
            logger.error("不识别的协议包: {}", magic);
            throw new RpcException(RpcError.UNKNOWN_PROTOCOL);
        }
        int packageCode = in.readInt();
        Class<?> packageClass;
        if(packageCode == PackageType.REQUEST_PACK.getCode()) {
            packageClass = RpcRequest.class;
        } else if(packageCode == PackageType.RESPONSE_PACK.getCode()) {
            packageClass = RpcResponse.class;
        } else {
            logger.error("不识别的数据包: {}", packageCode);
            throw new RpcException(RpcError.UNKNOWN_PACKAGE_TYPE);
        }
        int serializerCode = in.readInt();
        CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
        if(serializer == null) {
            logger.error("不识别的反序列化器: {}", serializerCode);
            throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
        }
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        Object obj = serializer.deserialize(bytes, packageClass);
        out.add(obj);
    }

ReplayingDecoder :需要实现这个,也就是将序列化的字节码转换成对象。

这里逻辑很简单:从管道里取出每个字段,然后判断是否合理或存在(比如序列化器没有对应的反序列化器的话,会抛出异常)。

细节就是:先判断协议头是否合理,全部合理则把序列化数据反序列化完事。

序列化器

kyro不是线程安全的!所以我采用ThreadLocal方式的kyro。

json序列化器:

这里使用Jackson作为json序列化工具。

json反序列化因为是object,容易出现错误,所以需要写个新函数来一一对照。

代码语言:javascript
复制
/*
        这里由于使用JSON序列化和反序列化Object数组,无法保证反序列化后仍然为原实例类型
        需要重新判断处理
     */
    private Object handleRequest(Object obj) throws IOException {
        RpcRequest rpcRequest = (RpcRequest) obj;
        for(int i = 0; i < rpcRequest.getParamTypes().length; i ++) {
            Class<?> clazz = rpcRequest.getParamTypes()[i];
            if(!clazz.isAssignableFrom(rpcRequest.getParameters()[i].getClass())) {
                byte[] bytes = objectMapper.writeValueAsBytes(rpcRequest.getParameters()[i]);
                rpcRequest.getParameters()[i] = objectMapper.readValue(bytes, clazz);
            }
        }
        return rpcRequest;
    }

kryo序列化器:

kyro不是线程安全的!所以我采用ThreadLocal方式的kyro。

kyro不是线程安全的!所以我采用ThreadLocal方式的kyro。

kyro不是线程安全的!所以我采用ThreadLocal方式的kyro。

实现方法就是正常注册kryo就行了。

NettyServerHandler 和 NettyClientHandler

这里是netty的最顶部和最尾部,不需要和序列化接触,直接处理rpcrequest和rpcresponse就可以了。

NettyClientHandler把结果传入编码器就行。

NettyServerHandler把结果传出去就行。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-02-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 内容
  • 优点
  • 出现的问题
  • 基础
    • 发送rpc请求
      • 自定义协议和解编码器
      相关产品与服务
      文件存储
      文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档