Netty 系列四(ChannelHandler 和 ChannelPipeline).

一、概念

    先来整体的介绍一下这篇博文要介绍的几个概念(Channel、ChannelHandler、ChannelPipeline、ChannelHandlerContext、ChannelPromise):

Channel:Netty 中传入或传出数据的载体; ChannelHandler:Netty 中处理入站和出站数据的应用程序逻辑的容器; ChannelPipeline:ChannelHandler链 的容器; ChannelHandlerContext:代表了 ChannelHandler 和 ChannelPipeline 之间的关联,每当有ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext; ChannelPromise:ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(), 从而使ChannelFuture不可变。

    我们来举一个例子描述这些概念之间的逻辑关系:服务端接收到客户端的连接请求,创建一个Channel同客户端进行绑定,新创建的 Channel 会都将会被分配一个新的ChannelPipeline(这项关联是永久性的,Channel 既不会附加另外一个ChannelPipeline,也不能分离当前的)。而 ChannelPipeline 作为 ChannelHandler链 的容器,当Channel 生命周期中状态发生改变时,将会生成对应的事件,这些事件将会被 ChannelPipeline 中 ChannelHandler 所响应,响应方法的参数一般都有一个 ChannelHandlerContext ,一个 ChannelHandler 对应一个 ChannelHandlerContext。

二、ChannelHandler

    Netty提供了大量预定义的可以开箱即用的ChannelHandler实现,包括用于各种协议的ChannelHandler。因此,我们在自定义ChannelHandler实现用于处理我们的程序逻辑时,只需要继承Netty 的一些默认实现即可,主要有两种:

1、继承 ChannelHandlerAdapter (在4.0 中 处理入站事件继承 ChannelInboundHandlerAdapter,处理出站事件继承 ChannelOutboundHandlerAdapter ;在5.0 推荐直接继承 ChannelHandlerAdapter)

2、继承 SimpleChannelInboundHandler

    这两种方式有什么区别呢?  当我们处理 入站数据 和 出站数据时,都需要确保没有任何的资源泄露。在入站方向,继承 SimpleChannelInboundHandler 的实现类会在消息被处理之后自动处理消息,而继承 ChannelHandlerAdapter 的实现类需要手动的释放消息(ReferenceCountUtil.release(msg));在出站方向,不管继承的是哪一种的实现类,当你处理了 write() 操作并丢弃了一个消息,那么你就应该释放它,不仅如此,还要通知 ChannelPromise。否则可能会出现 ChannelFutureListener 收不到某个消息已经被处理了的通知的情况。

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ReferenceCountUtil.release(msg);
    //ChannelPromise 是ChannelFuture的一个子类,设置成 true 通知 ChannelFutureListener 消息已经被处理了
    //当一个 Promise 被完成之后,其对应的 Future 的值便不能再进行任何修改了
    promise.setSuccess();
}

tips:总之,如果一个消息被消费或者丢弃了, 并且没有传递给 ChannelPipeline 中的下一个ChannelOutboundHandler, 那么用户就有责任调用 ReferenceCountUtil.release()。

    来看看 ChannelHandler 的 API:

isSharable :如果其对应的实现被标注为 Sharable, 那么这个方法将返回 true, 表示它可以被添加到多个 ChannelPipeline中

-- ChannelHandler 生命周期方法 -- handlerAdded :当把 ChannelHandler 添加到 ChannelPipeline 中时被调用 handlerRemoved :当从 ChannelPipeline 中移除 ChannelHandler 时被调用 exceptionCaught : 当处理过程中在 ChannelPipeline 中有错误产生时被调用

-- 处理入站数据以及各种状态变化 -- channelRegistered : 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用 channelUnregistered : 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调用 channelActive : 当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪 channelInactive : 当 Channel 离开活动状态并且不再连接它的远程节点时被调用 channelReadComplete : 当Channel上的一个读操作完成时被调用 channelRead : 当从 Channel 读取数据时被调用 ChannelWritabilityChanged :当 Channel 的可写状态发生改变时被调用。 userEventTriggered : 当 ChannelnboundHandler.fireUserEventTriggered()方法被调用时被调用,因为一个 POJO 被传经了 ChannelPipeline

-- 处理出站数据并且允许拦截所有的操作 -- bind : 当请求将 Channel 绑定到本地地址时被调用 connect : 当请求将 Channel 连接到远程节点时被调用 disconnect : 当请求将 Channel 从远程节点断开时被调用 close : 当请求关闭 Channel 时被调用 deregister : 当请求将 Channel 从它的 EventLoop 注销时被调用 read : 当请求从 Channel 读取更多的数据时被调用 flush : 当请求通过 Channel 将入队数据冲刷到远程节点时被调用 write :当请求通过 Channel 将数据写到远程节点时被调用

三、ChannelPipeline

    ChannelPipeline 是一个拦截流经 Channel 的入站和出站事件的ChannelHandler 实例链,它和 ChannelHandler 之间的交互组成了应用程序数据和事件处理逻辑的核心,而它们之间的关联交互就是通过 ChannelHandlerContext。

    如果一个入站事件被触发,它将被从 ChannelPipeline 的头部开始一直被传播到 Channel Pipeline 的尾端。如图,Netty 总是将 ChannelPipeline 的入站口作为头部,而将出站口作为尾端,如图,第一个被入站事件看到的 ChannelHandler 将是1,而第一个被出站事件看到的是 ChannelHandler 将是 5。

    既然 ChannelPipeline 是 ChannelHandler链 的容器,让我们来看看ChannelPipeline 是如何管理 ChannelHandler的吧!

addFirst : 将一个 ChannelHandler 添加到 ChannelPipeline 最开始位置中 addBefore :将一个 ChannelHandler 添加到 ChannelPipeline 某个ChannelHandler前 addAfter:将一个 ChannelHandler 添加到 ChannelPipeline 某个ChannelHandler后 addLast : 将一个 ChannelHandler 添加到 ChannelPipeline 最末尾位置 remove :将一个 ChannelHandler 从 ChannelPipeline 中移除 replace :将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler get :通过类型或者名称返回 ChannelHandler context :返回和 ChannelHandler 绑定的 ChannelHandlerContext names :返回 ChannelPipeline 中所有 ChannelHandler 的名称

ChannelPipeline 的API 用于调用入站操作的附加方法:

fireChannelRegistered: 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelRegistered(ChannelHandlerContext)方法 fireChannelUnregistered: 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelUnregistered(ChannelHandlerContext)方法 fireChannelActive: 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelActive(ChannelHandlerContext)方法 fireChannelInactive: 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelInactive(ChannelHandlerContext)方法 fireExceptionCaught: 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的exceptionCaught(ChannelHandlerContext, Throwable)方法 fireUserEventTriggered: 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的userEventTriggered(ChannelHandlerContext, Object)方法 fireChannelRead: 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelRead(ChannelHandlerContext, Object msg)方法 fireChannelReadComplete: 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelReadComplete(ChannelHandlerContext)方法 fireChannelWritabilityChanged: 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的channelWritabilityChanged(ChannelHandlerContext)方法

ChannelPipeline 的API 用于调用出站操作的附加方法:

bind: 将 Channel 绑定到一个本地地址,这将调用 ChannelPipeline 中的下一个ChannelOutboundHandler 的 bind方法 connect: 将 Channel 连接到一个远程地址,这将调用 ChannelPipeline 中的下一个ChannelOutboundHandler 的 connect方法 disconnect: 将 Channel 断开连接。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 disconnect方法 close: 将 Channel 关闭。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 close方法 deregister: 将 Channel 从它先前所分配的 EventExecutor(即 EventLoop)中注销。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 deregister方法 flush: 冲刷Channel所有挂起的写入。这将调用ChannelPipeline中的下一个ChannelOutboundHandler 的 flush方法 write: 将消息写入 Channel。这将调用 ChannelPipeline 中的下一个 ChannelOutboundHandler的write方法。注意:这并不会将消息写入底层的 Socket,而只会将它放入队列中。要将它写入 Socket,需要调用 flush()或者 writeAndFlush()方法 writeAndFlush: 这是一个先调用 write()方法再接着调用 flush()方法的便利方法 read: 请求从 Channel 中读取更多的数据。这将调用 ChannelPipeline 中的下一个ChannelOutboundHandler 的 read(ChannelHandlerContext)方法

四、ChannelHandlerContext

    ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关联。

    ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者 ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。而调用位于 ChannelHandlerContext上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler。因此,尽量使用 ChannelHandlerContext 的同名方法来处理逻辑,因为它将产生更短的事件流, 应该尽可能地利用这个特性来获得最大的性能。

五、寄语

    有时候也会迷茫,身边的人理论基础差一些的的,代码不一样敲的好好的?而我花大量时间细细的去研究这么理论真的值得吗?仔细想想,人生很多事情本来就是徒劳无功的啊,没必要急功近利,欲速则不达。要坚信,一切的付出总会在人生的某个时刻回报在我们身上。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏互联网大杂烩

Java锁与并发

保护临界区资源不会被多个线程同时访问时而受到破坏。通过锁,可以让多个线程排队。一个一个地进入临界区访问目标对象,使目标对象的状态总是保持一致。

1062
来自专栏java工会

Spring工作原理

内部最核心的就是IOC了,动态注入,让一个对象的创建不用new了,可以自动的生产,这其实就是利用java里的反射,反射其实就是在运行时动态的去创建、...

731
来自专栏JAVA技术zhai

干货:Java并发编程系列之synchronized(一)

2817
来自专栏互联网技术杂谈

flume 1.8.0 开发之RPC

flume开发基础可见:https://cloud.tencent.com/developer/article/1195082

4285
来自专栏java工会

Spring工作原理

内部最核心的就是IOC了,动态注入,让一个对象的创建不用new了,可以自动的生产,这其实就是利用java里的反射,反射其实就是在运行时动态的去创建、...

891
来自专栏JAVA高级架构

spring和springMVC的面试问题总结

1.Spring中AOP的应用场景、Aop原理、好处? 答:AOP--Aspect Oriented Programming面向切面编程;用来封装横切关注点,具...

3699
来自专栏芋道源码1024

注册中心 Eureka 源码解析 —— 网络通信

本文主要分享 Eureka 的网络通信部分。在不考虑 Eureka 2.x 的兼容的情况下,Eureka 1.x 主要两部分的网络通信:

1102
来自专栏Java架构师历程

Spring工作原理

      内部最核心的就是IOC了,动态注入,让一个对象的创建不用new了,可以自动的生产,这其实就是利用java里的反射,反射其实就是在运行时动态的去创建、...

2092
来自专栏Web项目聚集地

从零学习Spring MVC框架「RESTful风格实践」

学习本文章之前,我们需要知道什么是RESTful API,还对此不了解的朋友可以移步历史文章 RESTful 接口实现简明指南 ,简单来说就是就是用URL定位资...

1791
来自专栏竹清助手

浅谈Linux磁盘修复e2fsck命令

检查 /dev/mapper/VolGroup00-LogVol02 是否有问题,如发现问题便自动修复:

2692

扫码关注云+社区

领取腾讯云代金券