专栏首页CodeGuide | 程序员编码指南netty案例,netty4.1中级拓展篇十《Netty接收发送多种协议消息类型的通信处理方案》

netty案例,netty4.1中级拓展篇十《Netty接收发送多种协议消息类型的通信处理方案》

前言介绍

在我们实际做应用级开发的过程中,客户端与服务端需要发送多种消息类型,比如一个聊天室场景包括的消息类型;登录验证、组建群聊、发送消息、退出登录等等,但如果我们都是用统一对象加if判断来分别转换,那么对后期的维护成本就会非常大,这样的代码方式也不是一个面向对象开发的思维。面向对象的开发思路,经常会把很多if、switch等逻辑抽象成对应的接口和抽象类,以及加入工厂方式对服务进行动态编排。

那么我们在这里也同样需要定义一个抽象类,抽象类里包含了一个必须实现的标识性属性,用来编码解码时提取标识,找到对应的处理类进行操作。这样我们就可以不断的去扩展我们需要的不同维度的消息处理的Handler,在这个案例里我们模拟了;demo01、demo02、demo03三组消息处理handler,他们都统一继承抽象类Packet,并实现里面的getCommand方法。另外可以在这个抽象类中加入一些其他属性,包括;版本、校验、加密等,可以更加方便的用于处理各类通用非业务属性逻辑行为。

开发环境

1、jdk1.8【jdk1.7以下只能部分支持netty】 2、Netty4.1.36.Final【netty3.x 4.x 5每次的变化较大,接口类名也随着变化】

代码示例

itstack-demo-netty-2-10
└── src
    ├── main
    │   └── java
    │       └── org.itstack.demo.netty
    │           ├── client
    │           │   ├── MyChannelInitializer.java
    │           │   ├── MyClientHandler.java
    │           │   └── NettyClient.java
    │           ├── codec
    │           │   ├── ObjDecoder.java
    │           │   └── ObjEncoder.java
    │           ├── domain
    │           │   ├── protocol
    │           │   │   ├── Command.java
    │           │   │   ├── Packet.java
    │           │   │   └── PacketClazzMap.java
    │           │   ├── MsgDemo01.java
    │           │   ├── MsgDemo02.java
    │           │   └── MsgDemo03.java
    │           ├── server
    │           │   ├── handler
    │           │   │   ├── MsgDemo01Handler.java
    │           │   │   ├── MsgDemo02Handler.java
    │           │   │   └── MsgDemo03Handler.java
    │           │   ├── MyChannelInitializer.java
    │           │   └── NettyServer.java
    │           └── util
    │               ├── MsgUtil.java
    │               └── SerializationUtil.java
    │
    └── test
         └── java
             └── org.itstack.demo.test
                 └── ApiTest.java

只展示讲解重点代码块,全部代码可以关注公众号:bugstack虫洞栈,回复:netty源码,获取!

client/NettyClient.java | 增加了模拟发送不同类型的消息

public class NettyClient {

    public static void main(String[] args) {
        new NettyClient().connect("127.0.0.1", 7397);
    }

    private void connect(String inetHost, int inetPort) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.AUTO_READ, true);
            b.handler(new MyChannelInitializer());
            ChannelFuture f = b.connect(inetHost, inetPort).sync();
            System.out.println("itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}");

            //测试消息,分别发放demo01、demo02、demo03
            f.channel().writeAndFlush(MsgUtil.buildMsgDemo01(f.channel().id().toString(),"你好,消息体MsgDemo01,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,欢迎关注我获取案例源码。"));
            f.channel().writeAndFlush(MsgUtil.buildMsgDemo02(f.channel().id().toString(),"你好,消息体MsgDemo02,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,欢迎关注我获取案例源码。"));
            f.channel().writeAndFlush(MsgUtil.buildMsgDemo03(f.channel().id().toString(),"你好,消息体MsgDemo03,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,欢迎关注我获取案例源码。"));

            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}

codec/ObjDecoder.java | 改造解码器,通过读取指令取的对应的类来解码

public class ObjDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        byte command = in.readByte();  //读取指令
        byte[] data = new byte[dataLength - 1]; //指令占了一位,剔除掉
        in.readBytes(data);
        out.add(SerializationUtil.deserialize(data, PacketClazzMap.packetTypeMap.get(command)));
    }

}

codec/ObjEncoder.java | 改造编码器,在将对象序列化byte[]后,添加指令

public class ObjEncoder extends MessageToByteEncoder<Packet> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Packet in, ByteBuf out) {
        byte[] data = SerializationUtil.serialize(in);
        out.writeInt(data.length + 1);
        out.writeByte(in.getCommand()); //添加指令
        out.writeBytes(data);
    }

}

domain/protocol/Command.java | 定义指令组

public interface Command {

    Byte Demo01 = 1;   //测试01
    Byte Demo02 = 2;   //测试02
    Byte Demo03 = 3;   //测试03

}

domain/protocol/Packet.java | 定义协议包头,所以的通信消息都继承这个类

public abstract class Packet {

    /**
     * 获取协议指令
     * @return 返回指令值
     */
    public abstract Byte getCommand();

}

domain/protocol/PacketClazzMap.java | 方便获取对应类标识的Map结构

import org.itstack.demo.netty.domain.MsgDemo01;
import org.itstack.demo.netty.domain.MsgDemo02;
import org.itstack.demo.netty.domain.MsgDemo03;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class PacketClazzMap {

    public final static Map<Byte, Class<? extends Packet>> packetTypeMap = new ConcurrentHashMap<>();

    static {
        packetTypeMap.put(Command.Demo01, MsgDemo01.class);
        packetTypeMap.put(Command.Demo02, MsgDemo02.class);
        packetTypeMap.put(Command.Demo03, MsgDemo03.class);
    }

}

domain/MsgDemo01.java | 通信消息定义,三个类似展示其中一个

public class MsgDemo01 extends Packet {

    private String channelId;
    private String demo01;

    public MsgDemo01(String channelId, String demo01) {
        this.channelId = channelId;
        this.demo01 = demo01;
    }

    public String getChannelId() {
        return channelId;
    }

    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }

    public String getDemo01() {
        return demo01;
    }

    public void setDemo01(String demo01) {
        this.demo01 = demo01;
    }

    @Override
    public Byte getCommand() {
        return Command.Demo01;
    }

}

server/handler/MsgDemo01Handler.java | 用于处理消息的handler,三个类似展示其中一个

public class MsgDemo01Handler extends SimpleChannelInboundHandler<MsgDemo01> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MsgDemo01 msg) throws Exception {
        System.out.println("\r\n> msg handler ing ...");
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收消息的处理器:" + this.getClass().getName());
        System.out.println("channelId:" + msg.getChannelId());
        System.out.println("消息内容:" + msg.getDemo01());
    }

}

server/MyChannelInitializer.java | 在这里与以往不同,里面包含了三组消息处理handler,如果有更多可以依次添加

public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) {
        //对象传输处理[解码]
        channel.pipeline().addLast(new ObjDecoder());
        // 在管道中添加我们自己的接收数据实现方法
        channel.pipeline().addLast(new MsgDemo01Handler());
        channel.pipeline().addLast(new MsgDemo02Handler());
        channel.pipeline().addLast(new MsgDemo03Handler());
        //对象传输处理[编码]
        channel.pipeline().addLast(new ObjEncoder());
    }

}

测试结果

启动NettyServer

itstack-demo-netty server start done. 

> msg handler ing ...
2019-09-08 11:16:00 接收消息的处理器:org.itstack.demo.netty.server.handler.MsgDemo01Handler
channelId:a21401f4
消息内容:你好,消息体MsgDemo01,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,欢迎关注我获取案例源码。

> msg handler ing ...
2019-09-08 11:16:00 接收消息的处理器:org.itstack.demo.netty.server.handler.MsgDemo02Handler
channelId:a21401f4
消息内容:你好,消息体MsgDemo02,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,欢迎关注我获取案例源码。

> msg handler ing ...
2019-09-08 11:16:00 接收消息的处理器:org.itstack.demo.netty.server.handler.MsgDemo03Handler
channelId:a21401f4
消息内容:你好,消息体MsgDemo03,我是https://bugstack.cn博主,付政委。这是我的公众号<bugstack虫洞栈>,欢迎关注我获取案例源码。

Process finished with exit code -1

启动NettyClient

链接报告开始
itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}
链接报告信息:本客户端链接到服务端。channelId:a21401f4
链接报告IP:127.0.0.1
链接报告Port:51714
链接报告完毕
异常信息:
远程主机强迫关闭了一个现有的连接。
断开链接/127.0.0.1:51714

Process finished with exit code -1

本文分享自微信公众号 - bugstack虫洞栈(bugstack),作者:付政委

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-09-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • DDD专题案例三《领域驱动设计架构基于SpringCloud搭建微服务》

    微服务不是泥球小单体,而是具备更加清晰职责边界的完整一体的业务功能服务。领域驱动设计的思想通过Domain的功能域设计,可以把核心功能与支撑功能很好的区分开。而...

    小傅哥
  • DDD专题案例一《初识领域驱动设计DDD落地方案》

    DDD(Domain-Driven Design 领域驱动设计)是由Eric Evans最先提出,目的是对软件所涉及到的领域进行建模,以应对系统规模过大时引起的...

    小傅哥
  • netty案例,netty4.1基础入门篇九《自定义编码解码器,处理半包、粘包数据》

    在实际应用场景里,只要是支持sokcet通信的都可以和Netty交互,比如中继器、下位机、PLC等。这些场景下就非常需要自定义编码解码器,来处理字节码传输,并控...

    小傅哥
  • 通过hiveserver远程服务构建hive web查询分析工具

    (1)hive 三种启动方式及用途,本文主要关注通过hiveserver(可jdbc连接)的方式启动  1, hive  命令行模式,直接输入/hive/...

    用户1177713
  • Java项目中使用最多的排名前100的类,你用的最多的是哪些

    从事Java软件开发工作很大程度是要利用各种类库的api,有组织曾经从10000个开放源码的Java项目中,统计API类的使用频率;下面的列表显示了前100名。...

    用户1289394
  • SpringBoot开发案例从0到1构建分布式秒杀系统

    最近,被推送了不少秒杀架构的文章,忙里偷闲自己也总结了一下互联网平台秒杀架构设计,当然也借鉴了不少同学的思路。俗话说,脱离案例讲架构都是耍流氓,最终使用Spri...

    小柒2012
  • 线上服务 CPU 又 100% 啦?一键定位 so easy!

    来源:my.oschina.net/leejun2005/blog/1524687

    芋道源码
  • java9导出运行springboot的精简版jre

    为了分发一个springboot项目,在不考虑目标机器是否有jre的情况下,携带一个jre环境是一个选择。本文就尝试从原生的jdk 9.0.1 精简一个jre供...

    pollyduan
  • 第43节:Java学前要点

    学习Java,有人推荐去培训,有人说没用,其实有钱的,不知道如何学,或者逼不得已去的就可以,也有人自己为了不花这些钱,而选择自学,我觉得也行。

    达达前端
  • 卸载CentOS7-x64自带的OpenJDK并安装Sun的JDK7的方法

        java version "1.6.0"    OpenJDK Runtime Environment (build 1.6.0-b09)    Ope...

    庞小明

扫码关注云+社区

领取腾讯云代金券