首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java高阶必备之Netty基础原理

Java高阶必备之Netty基础原理

作者头像
老钱
发布2018-08-15 16:37:40
5120
发布2018-08-15 16:37:40
举报
文章被收录于专栏:码洞码洞

Netty是Java程序员通向高阶之路必须要过的门槛之一。干了几年的Java程序员发现业务开发似乎就是在SSH的世界里摸滚打爬的时候,会开始感到迷茫,难道程序员的日子就是如此枯燥么?深入使用一下Netty,另一个世界的大门就会开始打开。枯燥的编码会渐渐变得有趣,自主思考的能力也会开始加强。

Netty是建立在Java NIO基础之上最广泛使用的高性能网络框架。了解Netty之前,必须对NIO的概念有所了解。

NIO的意思是非阻塞IO,也就是说单个线程可以同时进行多个IO操作,而不会被任何IO操作阻塞住。同一个线程即能同时Accept网络套件字,又可以同时对套件字进行读写操作,然后还可以同时处理消息。

NIO基于事件机制,所有的IO操作都能抽象成一个事件。当新连接到来时,可以从内核中拿到ServerSocket的可读事件。当连接上的消息到来时,可以从内核中拿到Socket的读事件。当Socket中的缓冲区未满的时候,可以从内核中拿到Socket的可写事件。

当NIO线程从内核中拿到一个事件Event,就会开始使用相应的事件处理器EventHandler对这个事件进行处理。如果拿到ServerSocket可读事件,就会调用ServerSocket.accept获取一个新的Socket连接,然后将这个Socket连接加入到感兴趣的描述符列表中,如果拿到Socket可读事件就会开始调用Socket.read读取套件字的消息进行处理,处理完毕将返回结果序列化成一个字节数组,当Socket可以拿到可写事件时,说明套件字缓冲区未满,就拼命的将字节数组往Socket里灌,也就是调用Socket.write进行IO的写操作。

NIO从内核中拿事件的操作使用的是Selector.select函数调用,它对应操作系统界面的IO多路复用API。在现代操作系统里mac平台上对应的是kqueue模型,linux平台对应的是epoll模型,windows平台对应的是iocp模型。Java是一个跨平台的语言,JVM底层对操作系统的具体实现进行了抽象,统一向上层提供的是Selector系列API。用户只需要使用Selector提供的通用API来处理NIO相关功能即可,而无需关心底层具体操作系统API的差别了。

Selector可以理解为一个描述符对象[SocketChannel]列表,Selector通过调用操作系统API,传递一个描述符列表参数,然后就可以拿到内核提供的与所有的描述符相关的事件[Key]列表。

上面提到的NIO线程是一个单线程,但是实际上它可以是一个线程池,线程池中的每个线程负责一部分描述符的读写操作。它也可以是两个线程池,一个线程池只用来处理ServerSocket描述符建立新连接,另一个线程池专门干Socket读写的事。

// 两个线程池
ServerBootstrap bootstrap1 = new ServerBootstrap();
NioEventLoopGroup acceptorGroup = new NioEventLoopGroup(1); // 包含一个线程
NioEventLoopGroup rwGroup = new NioEventLoopGroup(4); // 包含四个线程
bootstrap1.group(acceptorGroup, rwGroup).channel(NioServerSocketChannel.class);

// 共享一个线程池
ServerBootstrap bootstrap2 = new ServerBootstrap();
NioEventLoopGroup group = new NioEventLoopGroup(4);
bootstrap2.group(group).channel(NioServerSocketChannel.class);

Netty提供了良好的封装,可以让我们很方便的配置线程池的功用。代码中的NioEventLoopGroup代表的就是一个线程池,池中每个线程都是一个独立的NioEventLoop,即Nio事件循环。当acceptor线程池接收到一个新连接后会将这个连接通过队列发送到读写线程池继续进行处理。线程池分开的好处是当读写线程池繁忙的时候不影响acceptor接收新连接。

NIO的读写操作也是一系列复杂的过程。当NIO读事件发生时,线程使用read操作读取到的消息可能是不完整的,剩下的部分可能还要在接下来多次读事件发生后才能读到完整的一个消息对象字节数组。也可能read操作读取到的消息包含多个消息对象,最后剩下的部分又是一个不完整的消息,这就需要在每个描述符关联对象中保存中间半包的状态。消息和消息之间又有组合关系,比如HTTP POST消息包含HTTP Header和HTTP Body两个部分,而HTTP Body又可能因为太大而分解为多个HTTP Chunks进行传输,这就要求NIO的读写消息的设计包含结构层级。写操作也不是一个简单的write操作就了事了,写操作要考虑到内核为每个套件字分配的buffer大小,如果buffer不够了,write写进去的数组是不能完全写进去的,写不进去的字节必须保留起来,等待下次写事件发生时,也就是内核缓冲有空闲空间了,才可以将剩下的数据发送过去。

Netty将消息的读写抽象为pipeline消息管道,结构上有点类似于计算机网络分层结构。pipeline的每一层会对应一个Handler,以上一层输出的消息结构作为输入,输出新的消息结构作为下一层的输入。pipeline对象挂接在每一个Socket链路上。

EventLoopGroup group = new NioEventLoopGroup(2);
ServerBootstrap bootstrap = new ServerBootstrap();
RedisOutputEncoder encoder = new RedisOutputEncoder();
MessageProcessor processor = new MessageProcessor();
bootstrap.group(group)
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<SocketChannel>() {
	@Override
	public void initChannel(SocketChannel ch) throws Exception {
		ChannelPipeline pipe = ch.pipeline();
		pipe.addLast(new ReadTimeoutHandler(60));
		pipe.addLast(new RedisInputDecoder());
		pipe.addLast(encoder);
		pipe.addLast(processor);
	}});

bootstrap.option(ChannelOption.SO_BACKLOG, 100)
 .option(ChannelOption.SO_REUSEADDR, true)
 .childOption(ChannelOption.TCP_NODELAY, true)
 .childOption(ChannelOption.SO_KEEPALIVE, true)
 .bind("localhost", 6379)
 .sync();

代码中我们在pipeline里定义了四层Handler,第一个是处理ReadTimeout,当一个连接长达60s没有任何消息的情况下会向下一层输出一个读超时消息。第二层是一个Redis消息解码器,将Socket中的字节流转换成Redis命令对象,第三层是一个Redis消息编码器,将Redis输出对象转称字节流,第四层是消息处理器,用来逐个处理Redis命令逻辑,这里一般就是我们复杂的业务逻辑所在地,我们会在业务逻辑里最终给Socket回馈消息输出,这个消息输出又会走一遍pipeline的每一层,直到转换成字节流写到内核socket缓冲区中才算完事。

然后我们设置一些套件字的特殊属性,比如监听队列大小、读写缓冲警戒水位大小、是否延迟发送等,然后绑定监听指定端口,服务器就可以开始永无止尽地工作了。

下面我们看核心解码器的实现,解码器要处理半包问题,也就是说当消息到来时,我们要用网络字节填充消息对象,结果填充了一半,字节没了,然后又要再次等待下一波字节,再将剩下内容填满。那这里有个问题就是需要记录当前消息对象填充状态,填充到哪里了,以免下次还需要重新填充。如果我们不记录填充状态,就需要将读取的网络字节再回退回去,然后待下一波消息来了,重新填充一个新对象,在网络环境较差的情况下势必会产生大量重复填充操作。所以Netty提供了ReplayingDecoder专门来封装处理半包消息的加码器。

上面的代码片段是Redis命令消息解码器的框架实现,Redis的命令消息是由参数的数量参数和多个字符串参数组成,半包的情况下我们可能只读到了部分参数,所以需要将读到的参数的位置记下来,后续网络字节到来时,只需要读取剩下的参数就可以了,读了一个完整的消息,就塞进out对象从管道上继续传递下去。

相比之下编码器就简单多了,只需要将消息序列化成字节数组填充到ByteBuf里,然后传递给pipeline就了事了。下面的代码是Redis数组对象的编码实现。

数组对象可以包含多种其它对象类型,所以需要用一个容器放置子对象。我们需要返回数组结果时,构造出一个ArrayOutput对象response,然后调用ctx.writeAndFlush(response)就可以将返回对象沿着pipeline传递给客户端了。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-01-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码洞 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档