前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >编程思想:如何设计一个好的通信网络协议

编程思想:如何设计一个好的通信网络协议

作者头像
用户1161731
发布2020-04-01 16:37:06
5.5K0
发布2020-04-01 16:37:06
举报
文章被收录于专栏:木宛城主

当网络中两个进程需要通信时,我们往往会使用 Socket 来实现。Socket 都不陌生。当三次握手成功后,客户端与服务端就能通信,并且,彼此之间通信的数据包格式都是二进制,由 TCP/IP 协议负责传输。

当客户端和服务端取得了二进制数据包后,我们往往需要『萃取』出想要的数据,这样才能更好的执行业务逻辑。所以,我们需要定义好数据结构来描述这些二进制数据的格式,这就是通信网络协议。简单讲,就是需要约定好二进制数据包中每一段字节的含义,比如从第 n 字节开始的 m 长度是核心数据,有了这样的约定后,我们就能解码出想要的数据,执行业务逻辑,这样我们就能畅通无阻的通信了。

网络协议的设计

概要划分

一个最基本的网络协议必须包含

  • 数据的长度
  • 数据

了解 TCP 协议的同学一定听说过粘包、拆包 这两个术语。因为TCP协议是数据流协议,它的底层根据二进制缓冲区的实际情况进行包的划分。所以,不可避免的会出现粘包,拆包 现象 。为了解决它们,我们的网络协议往往会使用一个 4 字节的 int 类型来表示数据的大小。比如,Netty 就为我们提供了 LengthFieldBasedFrameDecoder 解码器,它可以有效的使用自定义长度帧来解决上述问题。

同时一个好的网络协议,还会将动作和业务数据分离。试想一下, HTTP 协议的分为请求头,请求体——

  • 请求头:定义了接口地址、Http MethodHTTP 版本
  • 请求体:定义了需要传递的数据

这就是一种分离关注点的思想。所以自定义的网络协议也可以包含:

  • 动作指令:比如定义 code 来分门别类的代表不同的业务逻辑
  • 序列化算法:描述了 JAVA 对象和二进制之间转换的形式,提供多种序列化/反序列化方式。比如 jsonprotobuf 等等,甚至是自定义算法。比如:rocketmq 等等。

同时,协议的开头可以定义一个约定的魔数。这个固定值(4字节),一般用来判断当前的数据包是否合法。比如,当我们使用 telnet 发送错误的数据包时,很显然,它不合法,会导致解码失败。所以,为了减轻服务器的压力,我们可以取出数据包的前4个字节与固定的魔数对比,如果是非法的格式,直接关闭连接,不继续解码。

网络协议结构如下所示

代码语言:javascript
复制
+--------------+-----------+------------+-----------+----------+
| 魔数(4)       | code(1)   |序列化算法(1) |数据长度(4) |数据(n)   |
+--------------+-----------+------------+-----------+----------+ 

RocketMQ 通信网络协议的实现

RocketMQ 网络协议

这一小节,我们从RocketMQ 中,分析优秀通信网络协议的实现。RocketMQ 项目中,客户端和服务端的通信是基于 Netty 之上构建的。同时,为了更加有效的通信,往往需要对发送的消息自定义网络协议。

RocketMQ 的网络协议,从数据分类的角度上看,可分为两大类

  • 消息头数据(Header Data)
  • 消息体数据(Body Data)

从左到右

  • 第一段:4 个字节整数,等于2、3、4 长度总和
  • 第二段:4 个字节整数,等于3 的长度。特别的 byte[0] 代表序列化算法,byte[1~3]才是真正的长度
  • 第三段:代表消息头数据,结构如下
代码语言:javascript
复制
{
    "code":0,
    "language":"JAVA",
    "version":0,
    "opaque":0,
    "flag":1,
    "remark":"hello, I am respponse /127.0.0.1:27603",
    "extFields":{
        "count":"0",
        "messageTitle":"HelloMessageTitle"
    }
}
  • 第四段:代表消息体数据

RocketMQ 消息头协议详细如下:

Header 字段名

类型

Request

Response

code

整数

请求操作代码,请求接收方根据不同的代码做不同的操作

应答结果代码,0表示成功,非0表示各种错误代码

language

字符串

请求发起方实现语言,默认JAVA

应答接收方实现语言

version

整数

请求发起方程序版本

应答接收方程序版本

opaque

整数

请求发起方在同一连接上不同的请求标识代码,多线程连接复用使用

应答方不做修改,直接返回

flag

整数

通信层的标志位

通信层的标志位

remark

字符串

传输自定义文本信息

错误详细描述信息

extFields

HashMap<String,String>

请求自定义字段

应答自定义字段

编码过程

RocketMQ 的通信模块是基于 Netty的。通过定义 NettyEncoder 来实现对每一个 Channel的 出栈数据进行编码,如下所示:

代码语言:javascript
复制
@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
        try {
            ByteBuffer header = remotingCommand.encodeHeader();
            out.writeBytes(header);
            byte[] body = remotingCommand.getBody();
            if (body != null) {
                out.writeBytes(body);
            }
        } catch (Exception e) {
           ...
        }
    }
}

其中,核心的编码过程位于 RemotingCommand 对象中,encodeHeader 阶段,需要统计出消息总长度,即:

  • 定义消息头长度,一个整数表示:占4个字节
  • 定义消息头数据,并计算其长度
  • 定义消息体数据,并计算其长度
  • 额外再加 4是因为需要加入消息总长度,一个整数表示:占4个字节
代码语言:javascript
复制
public ByteBuffer encodeHeader(final int bodyLength) {
    // 1> 消息头长度,一个整数表示:占4个字节
    int length = 4;

    // 2> 消息头数据
    byte[] headerData;
    headerData = this.headerEncode();
    // 再加消息头数据长度
    length += headerData.length;

    // 3> 再加消息体数据长度
    length += bodyLength;
    // 4> 额外加 4是因为需要加入消息总长度,一个整数表示:占4个字节
    ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

    // 5> 将消息总长度加入 ByteBuffer
    result.putInt(length);

    // 6> 将消息的头长度加入 ByteBuffer
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

    // 7> 将消息头数据加入 ByteBuffer
    result.put(headerData);

    result.flip();

    return result;
}

其中,encode 阶段会将 CommandCustomHeader 数据转换 HashMap<String,String>,方便序列化

代码语言:javascript
复制
public void makeCustomHeaderToNet() {
    if (this.customHeader != null) {
        Field[] fields = getClazzFields(customHeader.getClass());
        if (null == this.extFields) {
            this.extFields = new HashMap<String, String>();
        }

        for (Field field : fields) {
            if (!Modifier.isStatic(field.getModifiers())) {
                String name = field.getName();
                if (!name.startsWith("this")) {
                    Object value = null;
                    try {
                        field.setAccessible(true);
                        value = field.get(this.customHeader);
                    } catch (Exception e) {
                        log.error("Failed to access field [{}]", name, e);
                    }

                    if (value != null) {
                        this.extFields.put(name, value.toString());
                    }
                }
            }
        }
    }
}

特别的,消息头序列化支持两种算法:

  • JSON
  • RocketMQ
代码语言:javascript
复制
private byte[] headerEncode() {
    this.makeCustomHeaderToNet();
    if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
        return RocketMQSerializable.rocketMQProtocolEncode(this);
    } else {
        return RemotingSerializable.encode(this);
    }
}

这儿需要值得注意的是,encode阶段将当前 RPC 类型和 headerData长度编码到一个 byte[4] 数组中,byte[0] 位序列化类型。

代码语言:javascript
复制
public static byte[] markProtocolType(int source, SerializeType type) {
    byte[] result = new byte[4];

    result[0] = type.getCode();
    result[1] = (byte) ((source >> 16) & 0xFF);
    result[2] = (byte) ((source >> 8) & 0xFF);
    result[3] = (byte) (source & 0xFF);
    return result;
}

其中,通过与运算 & 0xFF 取低八位数据。

所以, 最终 length 长度等于序列化类型 + header length + header data + body data 的字节的长度。

解码过程

RocketMQ 解码通过NettyDecoder来实现,它继承自 LengthFieldBasedFrameDecoder,其中调用了父类LengthFieldBasedFrameDecoder的构造函数

代码语言:javascript
复制
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);

这些参数设置4个字节代表 length总长度,同时解码时跳过最开始的4个字节:

代码语言:javascript
复制
frame = (ByteBuf) super.decode(ctx, in);

所以,得到的 frame= 序列化类型 + header length + header data + body data 。解码如下所示:

代码语言:javascript
复制
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    //总长度
    int length = byteBuffer.limit();
    //原始的 header length,4位
    int oriHeaderLen = byteBuffer.getInt();
    //真正的 header data 长度。忽略 byte[0]的 serializeType
    int headerLength = getHeaderLength(oriHeaderLen);

    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;

    return cmd;
}

private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
    switch (type) {
        case JSON:
            RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
            resultJson.setSerializeTypeCurrentRPC(type);
            return resultJson;
        case ROCKETMQ:
            RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
            resultRMQ.setSerializeTypeCurrentRPC(type);
            return resultRMQ;
        default:
            break;
    }

    return null;
}

其中,getProtocolType,右移 24位,拿到 serializeType

代码语言:javascript
复制
public static SerializeType getProtocolType(int source) {
    return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}

getHeaderLength 拿到 0-24 位代表的 headerData length:

代码语言:javascript
复制
public static int getHeaderLength(int length) {
    return length & 0xFFFFFF;
}

小结

对于诸多中间件而言,底层的网络通信模块往往会使用 NettyNetty 提供了诸多的编解码器,可以快速方便的上手。本文从如何设计一个网络协议入手,最终切入到 RocketMQ 底层网络协议的实现。可以看到,它并不复杂。仔细研读几遍变能理解其奥义。具体参考类NettyEncoderNettyDecoderRemotingCommand

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-03-30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 网络协议的设计
  • RocketMQ 通信网络协议的实现
  • 小结
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档