前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty实现心跳

Netty实现心跳

作者头像
付威
发布2020-02-17 11:29:53
1.3K0
发布2020-02-17 11:29:53
举报
心跳机制

心跳机制是常用的一个健康监测的机制,说白了就是每隔一段时间向服务器发送一个心跳的报文,服务收到报文后,就认为当前的客户端在活动的状态,否则会进入异常的机制,比如说主从切换

既然存在一个通信,就一定会有服务端和客户端。服务端开启监听,客户端发起心跳报文,然后服务就再次响应。

系统的设计
  1. 消息的类型 在服务端和客户端进行通信的时候,需要区分消息的类型,根据消息的类型分别进行不同的处理。
代码语言:javascript
复制
public enum MessageType {
  SERVICE_REQ((byte) 0),/*业务请求消息*/
  SERVICE_RESP((byte) 1), /*业务应答消息*/
  ONE_WAY((byte) 2), /*无需应答的消息*/
  LOGIN_REQ((byte) 3), /*登录请求消息*/
  LOGIN_RESP((byte) 4), /*登录响应消息*/
  HEARTBEAT_REQ((byte) 5), /*心跳请求消息*/
  HEARTBEAT_RESP((byte) 6);/*心跳应答消息*/
  private byte code;

  MessageType(byte code) {
       this.code = code;
  }
     
  public byte getValue() {
       return code;
  }

  public static MessageType getMessageType(String typeName){
       for (MessageType mt :MessageType.values()) {
            if(mt.toString().equals(typeName.trim())){
                 return mt;
            }

       }
       return null;
  }
  }
 
  1. 内容的类型 在设计这个传输的模型的时候考虑的文件的传输(当然也可以作为消息的类型),所以还需要定义一个内容的类型
代码语言:javascript
复制
 public enum ContentType {
       Default((byte) 0),
       File((byte) 1),
       Other((byte) 2);
       private byte code;
       ContentType(byte code) {
            this.code = code;
       }
          
       public byte getValue() {
            return code;
       }
          
       public static ContentType getContentType(String typeName){
            for (ContentType mt :ContentType.values()) {
                 if(mt.toString().equals(typeName.trim())){
                      return mt;
                 }
                    
            }
            return null;
       }
  }
 
  1. 消息头 消息头包含了消息的认证信息和长度,用来认证信息的合法来源和消息的截取。定义如下:
代码语言:javascript
复制
public class MessageHead {
       private int headData = DEFAULT_MAGIC_START_CODE;//协议开始标志
       private int length;//包的长度
       private String token;//认证的Token,可以设置时效
       private LocalDateTime createDate;
       private String messageId;
       private MessageType messageType;
       private ContentType  contentType;
  }
 
  1. 自定义传输Encoder和Decoder 在Netty中几乎所有的业务逻辑在Handler中,Encoder和Decoder是特殊的handler,用于对消息的编码和反编码。类似序列号和反序列号。
代码语言:javascript
复制
public class RzEncoder extends MessageToByteEncoder<Message> {
       @Override
       protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
            // TODO Auto-generated method stub
            // 写入开头的标志
            out.writeInt(msg.getHeader().getHeadData());
            // 写入包的的长度
            out.writeInt(msg.getContent().length);
            /** * token定长50个字节 * 第一个参数 原数组 * 第二个参数 原数组位置 * 第三个参数 目标数组 * 第四个参数 目标数组位置 * 第五个参数 copy多少个长度 */
            byte[] indexByte = msg.getHeader().getToken().getBytes();
            writeByte(out, indexByte, 50);
               
               
            byte[] createTimeByte = msg.getHeader().getCreateDate().toString().getBytes();
            writeByte(out, createTimeByte, 50);
               
            byte[] idByte = msg.getHeader().getMessageId().getBytes();
            writeByte(out, idByte, 50);
               
            byte[] msgType = new byte[]{msg.getHeader().getMessageType().getValue()};
            out.writeBytes(msgType);
            byte[] contentType = new byte[]{msg.getHeader().getContentType().getValue()};
            out.writeBytes(contentType);
          
               
            out.writeBytes(msg.getContent());
               
       }
          
       private void writeByte(ByteBuf out, byte[] bytes, int length) {
            byte[] writeArr = new byte[length];
            /** * * 第一个参数 原数组 * 第二个参数 原数组位置 * 第三个参数 目标数组 * 第四个参数 目标数组位置 * 第五个参数 copy多少个长度 */
            System.arraycopy(bytes, 0, writeArr, 0, bytes.length > writeArr.length ? writeArr.length : bytes.length);
            out.writeBytes(writeArr);
       }
          
       private void writeByte(ByteBuf out, String content, int length) {
            if (StringUtils.isEmpty(content)) {
                 content = "";
            }
            writeByte(out, content.getBytes(), length);
       }
          
  }
  
  public class RzDecoder extends ByteToMessageDecoder {
       private int BASE_LENGTH = 4 + 4 + 50 + 50 + 50 + 1 +1 ;//协议头 类型 int+length 4个字节+令牌和 令牌生成时间50个字节
       private int headData = DEFAULT_MAGIC_START_CODE;//协议开始标志
          
       @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
            // 刻度长度必须大于基本长度
            if (buffer.readableBytes() >= BASE_LENGTH) {
                 /** * 粘包 发送频繁 可能多次发送黏在一起 需要考虑 不过一个客户端发送太频繁也可以推断是否是攻击 */
                 //防止soket流攻击。客户端传过来的数据太大不合理
                 if (buffer.readableBytes() > 1024*1024*10) {
                      buffer.skipBytes(buffer.readableBytes());
                         
                 }
            }
            int beginIndex;//记录包开始位置
            while (true) {
                 // 获取包头开始的index
                 beginIndex = buffer.readerIndex();
                 //如果读到开始标记位置 结束读取避免拆包和粘包
                 if (buffer.readInt() == headData) {
                      break;
                 }
                    
                 //初始化读的index为0
                 buffer.resetReaderIndex();
                 // 当略过,一个字节之后,
                 //如果当前buffer数据小于基础数据 返回等待下一次读取
                 if (buffer.readableBytes() < BASE_LENGTH) {
                      return;
                 }
            }
            // 消息的长度
            int length = buffer.readInt();
            // 判断请求数据包数据是否到齐
            if ((buffer.readableBytes() - 100) < length) {
                 //没有到期 返回读的指针 等待下一次数据到期再读
                 buffer.readerIndex(beginIndex);
                 return;
            }
            //读取令牌
            byte[] tokenByte = new byte[50];
            buffer.readBytes(tokenByte);
               
               
            //读取令牌生成时间
            byte[] createDateByte = new byte[50];
            buffer.readBytes(createDateByte);
               
            //读取Id
            byte[] messageIdByte = new byte[50];
            buffer.readBytes(messageIdByte);
               
            byte[] messageTypeByte = new byte[1];
            buffer.readBytes(messageTypeByte);
            byte[] contentTypeByte = new byte[1];
            buffer.readBytes(contentTypeByte);
            ContentType contentType = ContentType.values()[contentTypeByte[0]];

            //读取content
            byte[] data = new byte[length];
            buffer.readBytes(data);
            MessageHead head = new MessageHead();
            head.setHeadData(headData);
            head.setToken(new String(tokenByte).trim());
            head.setCreateDate(LocalDateTime.parse(new String(createDateByte).trim()));
            head.setLength(length);
            head.setMessageId(new String(messageIdByte).trim());
            head.setMessageType(MessageType.values()[messageTypeByte[0]]);
            head.setContentType(contentType);
            Message message = new Message(head, data);
            //认证不通过
            if (!message.authorization(message.buidToken())) {
                 ctx.close();
                 return;
            }
            out.add(message);
            buffer.discardReadBytes();//回收已读字节
       }
  }

 
  1. 心跳的发送 心跳的发送就只剩下生成消息和发送了,此处略。

(本文完)

作者:付威 博客地址:http://blog.laofu.online

如有任何知识产权、版权问题或理论错误,还请指正。 本文是付威的网络博客原创,自由转载-非商用-非衍生-保持署名,请遵循:创意共享3.0许可证

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-02-01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 心跳机制
  • 系统的设计
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档