前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >理解Netty4线程模型-线程封闭技术实现的无锁化设计,单线程不要阻塞和无序事件如何处理(rocketmq源码分析无序事件处理)

理解Netty4线程模型-线程封闭技术实现的无锁化设计,单线程不要阻塞和无序事件如何处理(rocketmq源码分析无序事件处理)

作者头像
崔认知
发布2023-11-03 15:53:41
2520
发布2023-11-03 15:53:41
举报
文章被收录于专栏:nobodynobody

简介


Netty4的主从多线程模型是全局多线程,局部单线程(事件循环),用线程封闭技术实现的无锁化设计实现并发安全

一图理解Netty4线程模型


以使用最多的主、从多线程为例

代码语言:javascript
复制
 EventLoopGroup mainGroup = new NioEventLoopGroup(1);
 EventLoopGroup childGroup = new NioEventLoopGroup(10);
代码语言:javascript
复制

1、有两个主要的线程池,分别为boss线程池和worker线程池boss线程池主要处理客户端的连接事件,创建客户端连接并注册给worker线程池中某个线程。worker线程池主要处理客户端连接的读写事件

2、线程池抽象为NioEventLoopGroup,其中的每个线程抽象为NioEventLoop。

3、每个NioEventLoop里包含了一个Selector(注册并轮询socket事件)、一个

Thread(异步事件轮询处理线程)、一个队列tailTasks(事件循环最后需要执行的任务)、一个队列taskQueue(任务队列,排队执行任务)、一个scheduledTaskQueue(任务调度)。

4、每个客户端Channel都绑定到一个NioEventLoop(线程),每个NioEventLoop可被多个Channel绑定(即由绑定的NioEventLoop异步使用Selector轮询客户端的读写事件),可以使客户端Channel的事件处理单线程串行化无并发执行(线程封闭技术实现的无锁化设计)。

5、NioEventLoop的选择默认使用轮询调度(round-robin)策略

6、boss线程接受客户端连接后,会注册到worker线程中处理网络事件(放到任务队列,排队等待执行)。

代码语言:javascript
复制
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

7、worker线程NioEventLoop负责网络包的编码解码及业务的执行(由ChannelHandler组成的ChannelPipeline职责链模式处理),不过每个

ChannelHandler可以由不同的线程池来处理(注意线程池的选择,请参考10)。

代码语言:javascript
复制
 ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
 ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
 ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

8、Netty的单线程串行化使得网络事件的处理有序性,不会乱序。

9、切勿阻塞线程(单线程执行,发生阻塞影响后续业务处理,redis的单线程模型也是不能阻塞的),如果必须做阻塞调用或执行时间很长的任务,需要提交到其它线程池异步执行,此时网络事件的处理可能不会有序,需要业务方负责(注意线程池的选择,请参考10)。

10、异步线程池的选择很重要,会影响网络事件的有序性,一旦无序处理,需要业务方自己处理

(一般原则:排队,单线程执行 )

(一般原则:排队,多线程执行)

附:rocketmq 5.0.0 无序网络事件的处理:

rocketmq Broker端使用线程池隔离策略来处理不同的业务请求:

代码语言:javascript
复制
org.apache.rocketmq.broker.BrokerController

自定义线程池处理Netty的网络事件,脱离了Netty线程模型的有序性,rocketmq自身来处理网络事件的有序性:

1、每个请求都会携带唯一请求ID:

代码语言:javascript
复制
org.apache.rocketmq.remoting.protocol.RemotingCommand

2、每次请求都会保存请求ID到响应的关系:

代码语言:javascript
复制
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract

(1)使用ConcurrentMap来保存请求ID到响应的关系:

(2)发出请求时,获取分配的请求id,并构建响应ResponseFuture,保存其关系,等待响应返回:

3、等待响应(服务端会把请求id携带回来):

4、当客户端处理服务端的响应时,会根据返回的请求id,找到其之前保存的ResponseFuture,并设置ResponseFuture中的响应体,唤醒等待线程:

代码语言:javascript
复制
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-11-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 认知科技技术团队 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档