前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty 线程模型与基本使用

Netty 线程模型与基本使用

作者头像
Se7en258
发布2021-05-18 10:57:40
1.1K0
发布2021-05-18 10:57:40
举报
文章被收录于专栏:Se7en的架构笔记

为什么使用 Netty

Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能和高伸缩性的服务器和客户端。Netty 拥有高性能,吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。

Netty 和 NIO

NIO 的缺点

  • NIO 的类库和 API 繁杂,学习成本高,你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
  • 需要熟悉 Java 多线程编程。这是因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能写出高质量的 NIO 程序。
  • 臭名昭著的 epoll bug。它会导致 Selector 空轮询,最终导致 CPU 使用率飙升至 100%。直到 JDK1.7 版本依然没得到根本性的解决。

Netty 的优点

  • Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,API使用简单,学习成本低。
  • 功能强大,内置了多种解码编码器,支持多种协议。
  • 性能高,对比其他主流的 NIO 框架,Netty 的性能最优。
  • 社区活跃,发现 BUG 会及时修复,迭代版本周期短,不断加入新的功能。Dubbo、Elasticsearch 都采用了 Netty,质量得到验证。

Netty线程模型

模型解释

  • 1.Netty 抽象出两组线程池 BossGroup 和 WorkerGroup,BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写。
  • 2.BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup。NioEventLoopGroup 相当于是一个事件循环线程组,这个组内含有多个事件循环线程,每一个事件循环线程都是 NioEventLoop。
  • 3.每个 NioEventLoop 都有一个 Selector,用于监听注册在其上的 SocketChannel 的网络通讯。
  • 4.每个 Boss NioEventLoop 线程内部循环执行的3个步骤:
    • 处理 Accept 事件,与客户端建立连接,生成 NioSocketChannel。
    • 将 NioSocketChannel 注册到某个 Worker NioEventLoop 上的 Selector。
    • 处理任务队列的任务,即 runAllTask。
  • 5.每个 Worker NioEventLoop 线程内部循环执行的3个步骤:
    • 轮询注册到自己 Selector 上的所有 NioSocketChannel 的 read,write 事件。
    • 处理 I/O 事件,即 read,write 事件,在对应的 NioSocketChannel 处理业务。
    • runAllTasks 处理任务队列 TaskQueue 的任务,一些耗时的业务处理一般可以放入 TaskQueue 中慢慢处理,这样不影响数据在 Pipeline 中的流动处理。
  • 6.每个 Worker NioEventLoop 处理 NioSocketChannel 业务时,会使用 Pipeline(管道),Pipeline 中维护了很多的 handler 处理器用来处理 NioSocketChannel 中的数据。

Netty 客户端 & 服务器开发

创建并配置服务器启动器

Bootstrap、ServerBootstrap

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。

Bootstrap 和 ServerBootStrap 是 Netty 提供的一个创建客户端和服务端启动器的工厂类,使用这个工厂类非常便利地创建启动类。

可以看出都是继承于AbstractBootStrap抽象类,所以大致上的配置方法都相同。

一般来说,使用Bootstrap创建启动器的步骤可分为以下几步:

group()

服务端要使用两个线程组:

  • bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到 workerGroup 的 Selector 中。
  • workerGroup 用于处理每一个连接发生的读写事件。

一般创建线程组直接使用以下new就完事了:

代码语言:javascript
复制
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

默认的线程数是cpu核数的两倍。假设想自定义线程数,可以使用有参构造器:

代码语言:javascript
复制
//设置bossGroup线程数为1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//设置workerGroup线程数为8
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
channel()

这个方法用于设置通道类型,当建立连接后,会根据这个设置创建对应的 Channel 实例。

常用的就是这两个通道类型,因为是异步非阻塞的。所以是首选:

  • NioSocketChannel:异步非阻塞的客户端 TCP Socket 连接。
  • NioServerSocketChannel:异步非阻塞的服务器端 TCP Socket 连接。

还有就是同步阻塞的,一般没什么人用:

  • OioSocketChannel:同步阻塞的客户端 TCP Socket 连接。
  • OioServerSocketChannel:同步阻塞的服务器端 TCP Socket 连接。
option() 与 childOption()

option() 设置的是服务端用于接收进来的连接,也就是 boosGroup 线程。option() 常用参数:

代码语言:javascript
复制
SO_BACKLOG //Socket 参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows 为200,其他为128。

childOption() 是提供给父管道接收到的连接,也就是 workerGroup 线程。childOption() 常用的参数:

代码语言:javascript
复制
SO_RCVBUF  //Socket 参数,TCP 数据接收缓冲区大小。
TCP_NODELAY //TCP 参数,立即发送数据,默认值为 True。
SO_KEEPALIVE //Socket 参数,连接保活,默认值为 False。启用该功能时,TCP 会主动探测空闲连接的有效性。
pipeline(ChannelPipeline)

ChannelPipeline 是 Netty 处理请求的责任链,ChannelHandler 则是具体处理请求的处理器。在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:

处理器 Handler 分为两种:ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)。

一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler,通过 ChannelHandlerContext 上下文对象,就可以拿到 Channel、Pipeline 等对象,就可以进行读写等操作。

read事件(入站事件)和write事件(出站事件)在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 Handler。两种类型的 Handler 互不干扰,相同类型的 Handler 的处理顺序是有影响的。

在 Bootstrap 中 childHandler() 方法需要初始化通道,实例化一个 ChannelInitializer,这时候需要重写 initChannel() 初始化通道的方法,装配流水线就是在这个地方进行。代码如下:

代码语言:javascript
复制
//使用匿名内部类的形式初始化通道对象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { //创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
    //对 workerGroup的SocketChannel 设置处理器,调用我们自定义的 NettyServerHandler
      ch.pipeline().addLast(new NettyServerHandler());
   }
 });

自定义 NettyServerHandler 代码如下:

代码语言:javascript
复制
package com.chengzw.netty.base;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * 自定义 Handler 需要继承 netty 规定好的某个 HandlerAdapter(规范)
 * @author 程治玮
 * @since 2021/3/25 9:31 下午
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {


    /**
     * 读取客户端发送的数据
     *
     * @param ctx 上下文对象, 含有通道 channel,管道 pipeline
     * @param msg 就是客户端发送的数据
     * @throws Exception
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程 " + Thread.currentThread().getName());
        //将 msg 转成一个 ByteBuf,类似 NIO 的 ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8));
    }


    /**
     * 数据读取完毕后的处理方法
     *
     * @param ctx 上下文对象, 含有通道 channel,管道 pipeline
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }

    /**
     * 处理异常, 一般是需要关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

}
bind()

提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动,如果加上 sync() 方法则是同步。

代码语言:javascript
复制
 // sync 同步
ChannelFuture channelFuture = bootstrap.bind(9000).sync();

// 异步
// 给cf注册监听器,监听我们关心的事件
channelFuture.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
          if (channelFuture.isSuccess()) {
             System.out.println("监听端口9000成功");
          } else {
             System.out.println("监听端口9000失败");
          }
      }
 });
优雅地关闭 EventLoopGroup
代码语言:javascript
复制
//释放掉所有的资源,包括创建的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
服务端启动器完整代码
代码语言:javascript
复制
package com.chengzw.netty.base;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Netty 服务端
 * @author 程治玮
 * @since 2021/3/25 9:31 下午
 */
public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
        // 创建两个线程组 bossGroup 和 workerGroup, 含有的子线程 NioEventLoop 的个数默认为cpu核数的两倍
        // bossGroup 只是处理连接请求 ,真正的和客户端业务处理,会交给 workerGroup 完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(1); //1个线程
        EventLoopGroup workerGroup = new NioEventLoopGroup(8); //8个线程

        try {
            // 创建服务器端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 使用链式编程来配置参数
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    // 使用 NioServerSocketChannel 作为服务器的通道实现,该类用于实例化新的 Channel 来接收客户端的连接
                    .channel(NioServerSocketChannel.class)
                    // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
                    // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() { //创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //对 workerGroup的SocketChannel 设置处理器,调用我们自定义的 NettyServerHandler
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start ...");
            // 绑定一个端口并且同步, 生成了一个 ChannelFuture 异步对象,通过 isDone() 等方法可以判断异步事件的执行情况
            // 启动服务器(并绑定端口),bind 是异步操作,sync 方法是等待异步操作执行完毕
            // sync 同步
            ChannelFuture channelFuture = bootstrap.bind(9000).sync();

            // 异步
            // 给cf注册监听器,监听我们关心的事件
            /*channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (channelFuture.isSuccess()) {
                        System.out.println("监听端口9000成功");
                    } else {
                        System.out.println("监听端口9000失败");
                    }
                }
            });*/
            // 等待服务端监听端口关闭,closeFuture是异步操作
            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
            // 在这里面cf.channel().closeFuture().sync();这个语句的主要目的是,如果缺失上述代码,则main方法所在的线程,
            // 即主线程会在执行完bind().sync()方法后,会进入finally 代码块,之前的启动的nettyserver也会随之关闭掉,整个程序都结束了。
            // 原文的例子有英文注释:
            // Wait until the server socket is closed,In this example, this does not happen, but you can do that to gracefully shut down your server.
            // 线程进入wait状态,也就是main线程暂时不会执行到finally里面,nettyserver也持续运行,如果监听到关闭事件,可以优雅的关闭通道和nettyserver,
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 资源优雅释放
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

创建并配置客户端启动器

客户端只需要一个 NioEventLoopGroup,其余代码和服务器类似。首先自定义 NettyClientHandler 用于处理客户端 ChannelPipeline 的业务。

代码语言:javascript
复制
package com.chengzw.netty.base;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * 自定义 Handler 需要继承 netty 规定好的某个 HandlerAdapter(规范)
 * @author 程治玮
 * @since 2021/3/25 9:50 下午
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端连接服务器完成就会触发该方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }

    /**
     * 当通道有读取事件时会触发,即服务端发送数据给客户端
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
    }

    /**
     * 处理异常, 一般是需要关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

然后配置客户端启动器:

代码语言:javascript
复制
package com.chengzw.netty.base;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
 * Netty 客户端
 * @author 程治玮
 * @since 2021/3/25 9:52 下午
 */
public class NettyClient {
    public static void main(String[] args) throws Exception {
        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建客户端启动对象
            //注意客户端使用的不是ServerBootstrap而是Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //加入处理器
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });

            System.out.println("netty client start..");
            //启动客户端去连接服务器端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
            //对通道关闭进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

参考链接

  • https://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-5
  • https://zhuanlan.zhihu.com/p/181239748
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Se7en的架构笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么使用 Netty
  • Netty 和 NIO
  • Netty线程模型
  • Netty 客户端 & 服务器开发
    • 创建并配置服务器启动器
      • Bootstrap、ServerBootstrap
      • group()
      • channel()
      • option() 与 childOption()
      • pipeline(ChannelPipeline)
      • bind()
      • 优雅地关闭 EventLoopGroup
      • 服务端启动器完整代码
    • 创建并配置客户端启动器
    • 参考链接
    相关产品与服务
    云服务器
    云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档