前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty入门篇

Netty入门篇

作者头像
贪挽懒月
发布2020-07-28 14:39:56
4170
发布2020-07-28 14:39:56
举报
文章被收录于专栏:JavaEEJavaEE

一、netty概述

1、NIO存在的问题:

  • NIO的API比较复杂,需要熟练掌握3个核心组件,channel、buffer和selector;
  • 需要熟悉多线程、网络编程等技术;
  • 开发工作量大,难度也比较大,需要解决断连、重连、网络闪断、半包读写、失败缓存、网络拥堵等各种情况;
  • NIO存在bug,一个叫Epoll的bug,会导致选择器空轮询,形成死循环,最后CPU飙到100%

正是因为NIO存在这些问题,netty就应运而生了。

2、Netty简介: netty是一个异步的,基于事件驱动的网络应用框架。可以快速地开发高性能的服务器端和客户端,像dubbo和elasticsearch底层都用了netty。它具有以下优点:

  • 设计优雅,灵活可扩展;
  • 使用方便,用户指南清晰明确;
  • 安全,完整的SSL/TLS和StartTLS支持;
  • 社区活跃,不断地更新完善

官方下载地址:https://bintray.com/netty/downloads 我本次下载的版本是:4.1.51.Final

二、netty的架构设计

1、线程模型: 目前存在的线程模式:

  • 传统阻塞IO的服务模型
  • Reactor模式

根据Reactor的数量和1处理资源的线程数不同,又分3种:

  • 单Reactor单线程;
  • 单Reactor多线程;
  • 主从Reactor多线程

Netty的线程模型是基于主从Reactor多线程做了改进。

2、传统阻塞IO的线程模型: 采用阻塞IO获取输入的数据,每个连接都需要独立的线程来处理逻辑。存在的问题就是,当并发数很大时,就需要创建很多的线程,占用大量的资源。连接创建后,如果当前线程没有数据可读,该线程将会阻塞在读数据的方法上,造成线程资源浪费。

3、Reactor模式(分发者模式/反应器模式/通知者模式): 针对传统阻塞IO的模型,做了以下两点改进:

  • 基于IO复用模型:多个客户端共用一个阻塞对象,而不是每个客户端都对应一个阻塞对象
  • 基于线程池复用线程资源:使用了线程池,而不是每来一个客户端就创建一个线程

Reactor模式的核心组成:

  • Reactor:Reactor就是多个客户端共用的那一个阻塞对象,它单独起一个线程运行,负责监听和分发事件,将请求分发给适当的处理程序来进行处理
  • Handler:处理程序要完成的实际事件,也就是真正执行业务逻辑的程序,它是非阻塞的

4、单Reactor单线程:

模型图

这个图其实就跟之前的NIO群聊系统对应。多个客户端请求连接,然后Reactor通过selector轮询判断哪些通道是有事件发生的,如果是连接事件,就到了Acceptor中建立连接;如果是其他读写事件,就有dispatch分发到对应的handler中进行处理。这种模式的缺点就是Reactor和Handler是在一个线程中的,如果Handler阻塞了,那么程序就阻塞了。

5、单Reactor多线程:

单reactor多线程

处理流程如下:

  • Reactor对象通过Selector监听客户端请求事件,通过dispatch进行分发;
  • 如果是连接事件,则由Acceptor通过accept方法处理连接请求,然后创建一个Handler对象响应事件;
  • 如果不是连接请求,则由Reactor对象调用对应handler对象进行处理;handler只响应事件,不做具体的业务处理,它通过read方法读取数据后,会分发给线程池的某个线程进行业务处理,并将处理结果返回给handler;
  • handler收到响应后,通过send方法将结果返回给client。

相比单Reactor单线程,这里将业务处理的事情交给了不同的线程去做,发挥了多核CPU的性能。但是Reactor只有一个,所有事件的监听和响应,都由一个Reactor去完成,并发性还是不好。

6、主从Reactor多线程:

主从reactor多线程

这个模型相比单reactor多线程的区别就是:专门搞了一个MainReactor来处理连接事件,如果不是连接事件,就分发给SubReactor进行处理。图中这个SubReactor只有一个,其实是可以有多个的,所以性能就上去了。

  • 优点:父线程与子线程的交互简单、职责明确,父线程负责接收连接,子线程负责完成后续的业务处理;
  • 缺点:编程复杂度高

7、netty的模型: netty模型是基于主从Reactor多线程模型设计的,其工作流程如下:

  • Netty有两组线程池,一个Boss Group,它专门负责客户端连接,另一个Work Group,专门负责网络读写;
  • Boss Group和Work Group的类型都是NIOEventLoopGroup;
  • NIOEventLoopGroup相当于一个事件循环组,这个组包含了多个事件循环,每一个循环都是NIOEventLoop;
  • NIOEventLoop表示一个不断循环执行处理任务的线程,每个NIOEventLoop都有一个Selector,用于监听绑定在其上的socket;
  • Boss Group下的每个NIOEventLoop的执行步骤有3步:(1). 轮询accept连接事件;(2). 处理accept事件,与client建立连接,生成一个NioSocketChannel,并将其注册到某个work group下的NioEventLoop的selector上;(3). 处理任务队列的任务,即runAllTasks;
  • 每个Work Group下的NioEventLoop循环执行以下步骤:(1). 轮询read、write事件;(2). 处理read、write事件,在对应的NioSocketChannel处理;(3). 处理任务队列的任务,即runAllTasks;
  • 每个Work Group下的NioEventLoop在处理业务时,会使用pipeline(管道),pipeline中包含了channel,即通过pipeline可以获取到对应的channel,pipeline中维护了很多的处理器。


netty模型图如下,对应了上面那段流程:

netty模型图

三、netty入门实例

使用netty创建一个服务端与客户端,监听6666端口。

1、服务端:

  • NettyServer:
public class NettyServer {

    public static void main(String[] args) throws Exception {
        // 1. 创建boss group (boss group和work group含有的子线程数默认是cpu数 * 2)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 2. 创建work group
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            // 3. 创建服务端启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 4. 配置启动参数
            bootstrap.group(bossGroup, workGroup) // 设置两个线程组
                     .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
                     .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接个数
                     .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
                     .childHandler(new ChannelInitializer<SocketChannel>() { // 创建通道初始化对象
                        // 给pipeline设置处理器
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            // 传入自定义的handler
                            sc.pipeline().addLast(new NettyServerHandler());
                        } 
                    });
            // 5. 启动服务器并绑定端口
            ChannelFuture cf = bootstrap.bind(6666).sync();
            // 6. 对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
        
    }
}
  • NettyServerHandler:
public class NettyServerHandler extends ChannelInboundHandlerAdapter{
    
    // 读取数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("接收到客户端消息:" + buf.toString(CharsetUtil.UTF_8));
    }
    
    // 数据读取完毕后
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,我是服务端", CharsetUtil.UTF_8));
    }
    
    // 处理异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

2、客户端:

  • NettyClient:
public class NettyClient {

    public static void main(String[] args) throws Exception {
        // 1. 创建事件循环组
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            // 2. 创建启动对象
            Bootstrap bootstrap = new Bootstrap();
            // 3. 设置相关参数
            bootstrap.group(eventLoopGroup) // 设置线程组
                     .channel(NioSocketChannel.class) // 设置通道
                     .handler(new ChannelInitializer<SocketChannel>() {
                        // 给pipeline设置处理器
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            // 4. 连接服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
            // 5. 监听通道关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
        
    }
}
  • NettyClientHandler:
public class NettyClientHandler extends ChannelInboundHandlerAdapter{

    // 通道就绪就被触发
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client:" + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,我是客户端", CharsetUtil.UTF_8));
    }
    
    // 读取数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("接收到服务端消息:" + buf.toString(CharsetUtil.UTF_8));
    }
    
    // 处理异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

先启动服务端,然后启动客户端,就能看到服务端和客户端在控制台打印出来的消息了。

3、TaskQueue自定义任务: 上面服务端的NettyServerHandler的channelRead方法中,假如有一个非常耗时的业务,那么就会阻塞在那里,直到业务执行完。比如将NettyServerHandler的channelRead方法改成下面这样:

// 读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 线程休眠10秒,模拟耗时业务
    TimeUnit.SECONDS.sleep(10);
    ByteBuf buf = (ByteBuf) msg;
    System.out.println("接收到客户端消息:" + buf.toString(CharsetUtil.UTF_8));
}

启动后会发现,10秒钟后,服务端才会收到客户端发送的消息,客户端也要10秒后,才会接收到服务端的消息,即服务端的channelReadComplete方法是要在channelRead方法执行完后才会执行的。

一直阻塞在那里也不是办法,希望可以异步执行,那我们就可以把该任务提交到该channel对应的NioEventLoop的TaskQueue中。有以下解决方案:

  • 用户程序自定义的普通任务:将channelRead方法改成下面这样:
// 读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.channel().eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) { e.printStackTrace();}
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("接收到客户端消息:" + buf.toString(CharsetUtil.UTF_8));
        }
    });
}

这里仍旧休眠10秒。启动服务端,再启动客户端,发现服务端要10s后才会打印出客户端发送的消息,但是客户端立刻就可以收到服务端在channelReadComplete方法里发送的消息,说明这次是异步的。

  • 用户自定义定时任务:与上面的区别不大,代码如下:
// 读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.channel().eventLoop().schedule(new Runnable() {
        @Override
        public void run() {
            try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) { e.printStackTrace();}
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("接收到客户端消息:" + buf.toString(CharsetUtil.UTF_8));
        }
    }, 5, TimeUnit.SECONDS);
}

启动服务端,然后启动客户端,发现客户端还是会立即收到服务端发出的消息,而服务端,首先要等待5秒才去执行run方法,run方法里面线程又休眠了10秒,所以总共要15秒后才会打印出客户端发送的消息。

  • 非当前Reactor线程调用channel的各种方法:这个的意思就是,如果我别的业务代码,比如消息推送系统,也想给客户端发送消息,该咋整?其实很简单,在NettyServerHandler的channelRead方法里,我们是通过ChannelHandlerContext 拿到Channel然后进行各种操作的,所以拿到了Channel就可以进行操作。那么可以在NettyServer中将Channel保存到集合中去,然后遍历集合,拿到Channel就可以进行操作了。
// 给pipeline设置处理器
@Override
protected void initChannel(SocketChannel sc) throws Exception {
    // 传入自定义的handler
    sc.pipeline().addLast(new NettyServerHandler());
    // 在这里,可以将SocketChannel sc保存到集合中,别的线程拿到集合就可以调用channel的方法了
} 

四、Netty的异步模型

1、基本介绍:

  • Netty中的I/O操作都是异步的,包括bind、write和connect。这些操作会返回一个ChannelFuture对象,而不会立即返回操作结果。
  • 调用者不能立即得到返回结果,而是通过Futrue-Listener机制,用户可以主动获取或者通过通知机制获得IO操作的结果。
  • Netty的异步是建立在future和callback之上的。callback是回调,future表示异步执行的结果,它的核心思想是:假设有个方法fun(),计算过程可能非常耗时,等待fun()返回要很久,那么可以在调用fun()的时候,立马返回一个future,后续通过future去监控fun()方法的处理过程,这就是future-listener机制。
  • 用户可以通过注册监听函数,来获取操作真正的结果,ChannelFuture常用的函数如下:
// 判断当前操作是否完成
isDone
// 判断当前操作是否成功
isSuccess
// 获取操作失败的原因
getCause
// 判断当前操作是否被取消
isCancelled
// 注册监听器
addListener

2、使用监听器: 在NettyServer中的“启动并绑定端口”下面加上如下代码:

// 5. 启动服务器并绑定端口
ChannelFuture cf = bootstrap.bind(6666).sync();
// 注册监听器
cf.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture cf) throws Exception {
        if (cf.isSuccess()) {
            System.out.println("绑定端口成功");
        } else {
            System.out.println("绑定端口失败");
        }
    }
});

这样就注册了监听器,会监听绑定端口的结果,如果成功了,就会打印出绑定成功这段话。

五、使用Netty开发Http服务

开发一个Netty服务端,监听80端口,浏览器访问localhost,可以返回信息给浏览器。代码如下:

  • NettyHttpServer:
public class NettyHttpServer {

    public static void main(String[] args) throws Exception {
        // 1. 创建boss group (boss group和work group含有的子线程数默认是cpu数 * 2)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 2. 创建work group
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            // 3. 创建服务端启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 4. 配置启动参数
            bootstrap.group(bossGroup, workGroup) // 设置两个线程组
                    .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
                    .childHandler(new NettyHttpServerInitializer());
            // 5. 启动服务器并绑定端口
            ChannelFuture cf = bootstrap.bind(80).sync();
            // 6. 对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

}
  • NettyHttpServerInitializer:
public class NettyHttpServerInitializer extends ChannelInitializer<SocketChannel> {

    // 向管道加入处理器
    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
        // 1. 得到管道
        ChannelPipeline pipeline = sc.pipeline();
        // 2. 加入一个编码器解码器
        pipeline.addLast("codec", new HttpServerCodec());
        // 3. 增加一个自定义的handler处理器
        pipeline.addLast("handler", new NettyHttpServerHandler());
    }
}
  • NettyHttpServerHandler:
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject>{

    // 读取数据
    @Override
    protected void channelRead0(ChannelHandlerContext chc, HttpObject msg) throws Exception {
        // 1. 判断msg是不是httpRequest请求
        if (msg instanceof HttpRequest) {
            System.out.println("msg类型:" + msg.getClass());
            System.out.println("客户端地址:" + chc.channel().remoteAddress());
            // 对特定资源不做响应
            HttpRequest httpRequest = (HttpRequest) msg;
            URI uri = new URI(httpRequest.uri());
            if ("/favicon.ico".equals(uri.getPath())) {
                System.out.println("请求了图标,不做响应");
                return;
            }
            // 2. 创建回复给浏览器的信息
            ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器,are you ok?", CharsetUtil.UTF_8);
            // 3. 构造http响应
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            // 4. 将response返回
            chc.writeAndFlush(response);
        }
    }
}

启动server后,在浏览器就访问localhost就可以返回相关内容了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、netty概述
  • 二、netty的架构设计
  • 三、netty入门实例
  • 四、Netty的异步模型
  • 五、使用Netty开发Http服务
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档