前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty中级篇

Netty中级篇

作者头像
贪挽懒月
发布2020-08-11 15:33:06
1K0
发布2020-08-11 15:33:06
举报
文章被收录于专栏:JavaEE

一、Netty核心模块

  • BootStrap:客户端程序的启动引导类
  • ServerBootStrap:服务端程序的启动引导类

它们的常用方法有:

代码语言:javascript
复制
- group:设置线程组
- channel:指定通道的实现类
- option:给channel添加配置
- childOption:给接收到的channel添加配置
- handler:设置bossGroup的handler
- childHandler:设置workGroup的handler
  • Selector:用来接收注册的channel的。当往selector中注册channel后,selector就会自动不断地轮询这些channel是否有就绪的IO事件。
  • ChannelHandler:是一个接口,处理IO事件或者拦截IO事件,也就是说你拿到channel后想干的事情都通过channelHandler来完成。
  • ChannelPipeline:是一个handler的集合,负责处理和拦截入站事件和出站操作。一个channel包含了一个ChannelPipeline,而ChannelPipeline又维护了一个由ChannelHandlerContext组成的双向链表,并且每个ChannelHandlerContext中又关联着一个ChannelHandler。入站事件就是从链表头传递到链表尾的handler,出站事件就是从链表尾传到链表头的handler。ChannelPipeline的常用方法:
代码语言:javascript
复制
- addFirst:把handler添加到链表第一个位置
- addLast:把handler添加到链表的最后一个位置
  • ChannelHandlerContext:保存channel相关的上下文信息,同时关联了Channelhandler对象,也绑定了对应的pipeline和channel信息,方便对channelhandler进行调用。

二、用Netty实现聊天室功能

之前说过用NIO实现聊天室,现在来看看用netty如何实现聊天室。这里我将新建两个maven项目,一个服务端,一个客户端,最后可以打成jar包,服务端jar包运行在你电脑上,客户端jar包自己跑一份,还可以发给你的同事,然后就可以愉快的聊天了。

1、服务端:

  • pom.xml:引入netty的依赖,还要配置一下打包插件,不然你运行的jar包就会报“找不到主清单文件”或者没把netty依赖打包上去。
代码语言:javascript
复制
<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.50.Final</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>com.zhusl.study.chatroom.NettyChatRoomClient</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

客户端的pom.xml唯一的区别就是 <mainClass>换成了客户端启动类。

  • NettyChatRoomServer:
代码语言:javascript
复制
public class NettyChatRoomServer {
    
    public void run () throws Exception {
        // 创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            // 配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup)
                     .channel(NioServerSocketChannel.class)
                     .option(ChannelOption.SO_BACKLOG, 128)
                     .childOption(ChannelOption.SO_KEEPALIVE, true)
                     .childHandler(new NettyChatRoomServerInitializer());
            // 监听端口
            ChannelFuture cf = bootstrap.bind(6666).sync(); 
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        NettyChatRoomServer ncrs = new NettyChatRoomServer();
        ncrs.run();
    }
}
  • NettyChatRoomServerInitializer:
代码语言:javascript
复制
public class NettyChatRoomServerInitializer extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("decode",new StringDecoder());//解码器
        pipeline.addLast("encode",new StringEncoder());//编码器
        pipeline.addLast("handler",new NettyChatRoomServerHandler());
        
    }
}
  • NettyChatRoomServerHandler:
代码语言:javascript
复制
public class NettyChatRoomServerHandler extends SimpleChannelInboundHandler<String>{

    private  static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    /**
     * 当有channel加入时执行该方法(即当有客户端连接时)
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
        System.out.println("【" + ip + "】" + "进入聊天室");
        for (Channel channel : channelGroup) {
            // 给别的客户端提醒:xxx加入群聊
            if (ctx.channel() != channel) {
                channel.writeAndFlush("【" + ip + "】" + "进入聊天室");
            }
        }
        // 将当前channel加入到channelGroup中
        channelGroup.add(ctx.channel());
    }
    
    /**
     * 当有channel删除时执行该方法(即客户端断开连接)
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
        System.out.println("【" + ip + "】" + "离开聊天室");
        for (Channel channel : channelGroup) {
            // 给别的客户端提醒:xxx加入群聊
            if (ctx.channel() != channel) {
                channel.writeAndFlush("【" + ip + "】" + "离开聊天室");
            }
        }
        // 这里不需要channelGroup.remove,会自动remove
    }
    
    /**
     * 当有数据读取时执行该方法
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
        System.out.println("【" + ip + "】" + ":" + msg);
        for (Channel channel : channelGroup) {
            // 将消息转发给别的客户端
            if (ctx.channel() != channel) {
                channel.writeAndFlush("【" + ip + "】"  + ":" + msg);
            } else {
                channel.writeAndFlush("【我】:" + msg);
            }
        }
    }
    
    /**
     * 异常处理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

2、客户端:

  • NettyChatRoomClient:
代码语言:javascript
复制
public class NettyChatRoomClient {

    @SuppressWarnings("resource")
    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                     .channel(NioSocketChannel.class)
                     .handler(new NettyChatRoomClientInitializer());
            // 连接服务器
            ChannelFuture channelFuture = bootstrap.connect("192.168.2.36", 7777).sync();
            Channel channel = channelFuture.channel();
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                channel.writeAndFlush(msg);
            }
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        NettyChatRoomClient ncrc = new NettyChatRoomClient();
        ncrc.run();
    }
}
  • NettyChatRoomClientInitializer:
代码语言:javascript
复制
public class NettyChatRoomClientInitializer extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("decode",new StringDecoder());//解码器
        pipeline.addLast("encode",new StringEncoder());
        pipeline.addLast("handler",new NettyChatRoomClientHandler());
    }
}
  • NettyChatRoomClientHandler:
代码语言:javascript
复制
public class NettyChatRoomClientHandler extends SimpleChannelInboundHandler<String>{
    
    /**
     * 异常处理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 将从服务端接收到的消息打印出来
        System.out.println(msg);
    }
}

三、Netty心跳检测机制

客户端与服务端连接是否正常,需要有一个机制来检测,Netty提供了心跳检测机制。 1、Netty心跳检测案例:

  • 服务器超过3秒没有读操作,提示读空闲
  • 服务器超过5秒没有写操作,提示写空闲
  • 服务器超过7秒没有读和写,提示读写空闲

客户端和以前一样,没有变换,主要是服务端加了日志handler以及childHandler重写了一个用于检测心跳的方法userEventTriggered,服务端代码如下:

  • HeartBeatServer:
代码语言:javascript
复制
public class HeartBeatServer {

    public static void main(String[] args) throws Exception {
        // 1. 创建boss group (boss group和work group含有的子线程数默认是cpu数 * 2)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 2. 创建work group
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            // 3. 创建服务端启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 4. 配置启动参数
            bootstrap.group(bossGroup, workGroup) // 设置两个线程组
                    .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
                    .handler(new LoggingHandler(LogLevel.INFO)) // 日志处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 创建通道初始化对象
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
                            sc.pipeline().addLast(new HeartBeatServerHandler());
                        }
                    });
            // 5. 启动服务器并绑定端口
            ChannelFuture cf = bootstrap.bind(6666).sync();
            // 6. 对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
  • HeartBeatServerHandler:
代码语言:javascript
复制
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            String info = null;
            switch (event.state()) {
            case READER_IDLE:
                info = "读空闲";
                break;
            case WRITER_IDLE:
                info = "写空闲";
                break;
            case ALL_IDLE:
                info = "读写空闲";
                break;
            }
            System.out.println(ctx.channel().remoteAddress() + ":" + info);
        }
    }
}

四、WebSocket长连接开发

1、http协议和websocket协议的区别:

  • http协议:无状态、短连接、被动型。即只能由客户端发起请求,服务端给客户端响应,当服务端响应完,本次请求的生命周期就结束了。客户端没办法主动感知服务端的变化,服务端也没办法主动推送数据给客户端。比如你请求秒杀接口,秒杀接口给你返回排队中,那到底什么时候排上号了呢?客户端就得不断地循环请求获取秒杀结果的接口。
  • websocket:是基于http协议开发的,握手过程和htpp一样。所谓长连接,就是服务端和客户端可以相互感知,浏览器关闭了服务端可以感知,服务端关闭了浏览器可以感知。比如还是秒杀,如果是用websocket长连接开发的接口,你请求秒杀返回排队中,然后你不用再循环请求获取订单状态的接口,服务端和客户端会保持一个长连接,服务端可以主动把订单状态推给客户端。

2、案例代码:

  • WebSocketServer:
代码语言:javascript
复制
public class WebSocketServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup) // 设置两个线程组
                    .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
                    .handler(new LoggingHandler(LogLevel.INFO)) // 日志处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 创建通道初始化对象
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new HttpServerCodec()); // 使用http的编码解码器
                            sc.pipeline().addLast(new ChunkedWriteHandler()); // 是以块方式写,添加ChunkedWriteHandler处理器
                            sc.pipeline().addLast(new HttpObjectAggregator(8192)); // http数据在传输的时候是分段的,用这个处理器就可聚集分段
                            // 请求的url就是:ws://localhost:6666/hello
                            sc.pipeline().addLast(new WebSocketServerProtocolHandler("/hello"));
                            sc.pipeline().addLast(new WebSocketServerHandler());
                        }
                    });
            ChannelFuture cf = bootstrap.bind(80).sync();
            System.out.println("服务端准备好了");
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
  • WebSocketServerHandler:
代码语言:javascript
复制
public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("服务器收到消息:" + msg.text());
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:" + LocalDateTime.now()) + ",msg:" + msg.text());
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerAdded被调用:" + ctx.channel().id().asLongText());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved被调用:" + ctx.channel().id().asLongText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exceptionCaught被调用:" + ctx.channel().id().asLongText());
        ctx.close();
    }
}

然后编写一个页面,用来发送websocket请求:

代码语言:javascript
复制
<body>
  <script type="text/javascript">
     var socket;
     if(window.WebSocket) {
         socket = new WebSocket("ws://127.0.0.1/hello");
         // 接收服务器端返回的消息,显示在responseText中
         socket.onmessage = function(ev){
             var rt = document.getElementById("responseText");
             rt.value = rt.value + "\n" + ev.data;
         }
         // 相当于开启连接
         socket.onopen = function(ev){
             var rt = document.getElementById("responseText");
             rt.value = "连接开启了";
         }
         // 连接关闭
         socket.onclose = function(ev){
             var rt = document.getElementById("responseText");
             rt.value = rt.value + "\n" + "连接关闭了";
         }
     } else {
         alert("浏览器不支持websocket");
     }
     
     function send(message){
         if (!window.socket){
             return;
         }
         if (socket.readyState == WebSocket.OPEN){
             socket.send(message);
         } else {
             alert("连接没有开启");
         }
     }
  </script>

  <form onsubmit="return false">
     <textarea name="message" style="height:300px;width: 300px"></textarea>
     <input type="button" value="发送消息" onclick="send(this.form.message.value)">
     <textarea id="responseText" style="height:300px;width: 300px"></textarea>
  </form>
</body>

访问这个页面,服务端启动或者关闭会在框框中显示出来,同样,如果客户端关闭,服务端也会在控制台打印出来。

五、protobuf

1、编解码问题: 数据在网络中是以二进制字节码传输的,发送的数据需要编码,服务端收到后需要解码。Netty提供了StringDecoder、ObjectDecoder,底层采用的是java序列化技术,java序列化本身效率较低,而且无法跨语言,所以就有了protobuf。

2、protobuf简介: 它是Google的开源项目,轻便高效的结构化数据存储格式,可用于数据的序列化,且支持跨平台跨语言,很适合做数据存储和RPC数据交换格式。

3、protobuf的使用:

  • 需求:客户端发送一个Student对象到服务器,服务器接收这个对象,并显示它的信息。

下面开始编码:

代码语言:javascript
复制
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.6.1</version>
</dependency>
  • 编写Student.proto:
代码语言:javascript
复制
syntax = "proto3"; // 版本
option java_outer_classname = "StudentPOJO"; // 外部类名
message Student { // 内部类名
   int32 id = 1; // 1不是值,而是序号
   string name = 2;
}
  • 将这个类Student.proto复制到刚解压出来的bin目录下,也就是和protoc.exe要同一个目录;
  • 在此目录打开cmd,执行如下命令:
代码语言:javascript
复制
protoc.exe --java_out=. Student.proto

执行完后会在protoc.exe所在目录生成一个StudentPOJO.java文件,这就是我们要的文件,复制到项目中。

  • 客户端的修改: NettyClientHandler中的channelActive改成这样:
代码语言:javascript
复制
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("client:" + ctx);
    // 发送student对象到服务端
    StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(666).setName("张三").build();
    ctx.writeAndFlush(student);
}

NettyClient中添加protobuf的编码器:

代码语言:javascript
复制
@Override
protected void initChannel(SocketChannel sc) throws Exception {
    // 加入protobuf的编码器
    sc.pipeline().addLast("encoder", new ProtobufEncoder());
    sc.pipeline().addLast(new NettyClientHandler());
}
  • 服务端的修改: NettyServer中添加解码器:
代码语言:javascript
复制
@Override
protected void initChannel(SocketChannel sc) throws Exception {
    // 加入解码器,指定解码对象
    sc.pipeline().addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
    // 传入自定义的handler
    sc.pipeline().addLast(new NettyServerHandler());
    // 在这里,可以将SocketChannel sc保存到集合中,别的线程拿到集合就可以调用channel的方法了
} 

NettyServerHandler中读取student对象:

代码语言:javascript
复制
// 读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 读取客户端发送的student
    StudentPOJO.Student student = (Student) msg;
    System.out.println("客户端发送的数据是:id=" + student.getId() + ", name=" + student.getName());
}

启动服务端,再启动客户端,就可以看到服务端后台打印出了如下信息:

代码语言:javascript
复制
客户端发送的数据是:id=666, name=张三
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Netty核心模块
  • 二、用Netty实现聊天室功能
  • 三、Netty心跳检测机制
  • 四、WebSocket长连接开发
  • 五、protobuf
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档