简介
前文学习Netty的ByteBuf数据容器。本文开始学习ChannelPipeline和ChannelHandler,它们的角色非常类似于流水生产线。
Channel接口定义了一个简单但是强大的状态模型,该模型与ChannelInboundHandler API紧密联系。
ChannelInboundHandler源码如下
public interface ChannelInboundHandler extends ChannelHandler {
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} is now active
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
* end of lifetime.
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
* Invoked when the current {@link Channel} has read a message from the peer.
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* Invoked when the last message read by the current read operation has been consumed by
* {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further
* attempt to read an inbound data from the current {@link Channel} will be made until
* {@link ChannelHandlerContext#read()} is called.
*/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if an user event was triggered.
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
/**
* Gets called once the writable state of a {@link Channel} changed. You can check the state with
* {@link Channel#isWritable()}.
*/
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if a {@link Throwable} was thrown.
*/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;}
其中标记Channel生命周期状态的方法如下:
以下列出Channel的4个状态:
ChannelUnregistered
:Channel已经被创建,但还未注册到EventLoopChannelRegistered
:Channel已经被注册到了EventLoopChannelActive
:Channel处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了ChannelInactive
:Channel没有连接到远程节点
Channel的正常生命周期如下图所示
当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline中的ChannelHandler,其可以随后对它们做出响应。
在 ChannelHandler
被添加到 ChannelPipeline
中或者被从 ChannelPipeline
中移除时会调用这些操作,这些方法中的每一个都接受一个 ChannelHandlerContext
参数。
ChannelHandler源码如下:
public interface ChannelHandler {
/**
* Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
*/
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
* anymore.
*/
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if a {@link Throwable} was thrown.
*
* @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
* implement the method there.
*/
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
/**
* Indicates that the same instance of the annotated {@link ChannelHandler}
* can be added to one or more {@link ChannelPipeline}s multiple times
* without a race condition.
* <p>
* If this annotation is not specified, you have to create a new handler
* instance every time you add it to a pipeline because it has unshared
* state such as member variables.
* <p>
* This annotation is provided for documentation purpose, just like
* <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
*/
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}
ChannelHandler的生命周期方法解析:
handlerAdded
:当把ChannelHandler
添加到ChannelPipeline
中时被调用handlerRemoved
:当从ChannelPipeline
中移除ChannelHandler
时被调用exceptionCaught
:当处理过程中在ChannelPipeline
中有错误产生时被调用
Netty定义了下面两个重要的ChannelHandler子接口:
ChannelInboundHandler
——处理入站数据以及各种状态变化ChannelOutboundHandler
——处理出站数据并且允许拦截所有的操作
ChannelInboundHandler
接口生命周期中的方法,当接受到数据或者其对应的 Channel
的状态发生变化则会调用方法
ChannelInboundHandler
源码如下:
1. public interface ChannelInboundHandler extends ChannelHandler {
2.
3. /**
4. * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
5. */
6. void channelRegistered(ChannelHandlerContext ctx) throws Exception;
7.
8. /**
9. * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
10. */
11. void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
12.
13. /**
14. * The {@link Channel} of the {@link ChannelHandlerContext} is now active
15. */
16. void channelActive(ChannelHandlerContext ctx) throws Exception;
17.
18. /**
19. * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
20. * end of lifetime.
21. */
22. void channelInactive(ChannelHandlerContext ctx) throws Exception;
23.
24. /**
25. * Invoked when the current {@link Channel} has read a message from the peer.
26. */
27. void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
28.
29. /**
30. * Invoked when the last message read by the current read operation has been consumed by
31. * {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further
32. * attempt to read an inbound data from the current {@link Channel} will be made until
33. * {@link ChannelHandlerContext#read()} is called.
34. */
35. void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
36.
37. /**
38. * Gets called if an user event was triggered.
39. */
40. void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
41.
42. /**
43. * Gets called once the writable state of a {@link Channel} changed. You can check the state with
44. * {@link Channel#isWritable()}.
45. */
46. void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
47.
48. /**
49. * Gets called if a {@link Throwable} was thrown.
50. */
51. @Override
52. @SuppressWarnings("deprecation")
53. void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
54. }
当某个 ChannelInboundHandler
的实现重写 channelRead()
方法时,它将负责显式地释放与池化 ByteBuf
实例相关的内存,Netty为此提供了一个实用方法 ReferenceCountUtil.release()
。
1. @ChannelHandler.Sharable
2. //扩展了ChannelInboundHandlerAdapter
3. public class DiscardHandler extends ChannelInboundHandlerAdapter{
4.
5. @Override
6. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
7. //丢弃已接收的消息
8. ReferenceCountUtil.release(msg);
9. }
10. }
当你不想处理这些释放资源等操作的话,Netty提供了SimpleChannelInboundHandler会自动释放资源,因此无需显式释放,代码如下:
1. public class MySimpleHandler
2. extends SimpleChannelInboundHandler<Object> {
3. @Override
4. public void channelRead0(ChannelHandlerContext ctx,
5. Object msg) {
6. // No need to do anything special
7. }
8. }
为了更好地理解,我们看看SimpleChannelInboundHandler的源码,从中可以看到,它已经帮我们释放资源 了的,我们只需要实现channelRead0方法,在channelRead0()处理我们的业务逻辑即可。
1. public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
2. //省略了很多代码。。。。。。
3. @Override
4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
5. boolean release = true;
6. try {
7. if (acceptInboundMessage(msg)) {
8. @SuppressWarnings("unchecked")
9. I imsg = (I) msg;
10. channelRead0(ctx, imsg);
11. } else {
12. release = false;
13. ctx.fireChannelRead(msg);
14. }
15. } finally {
16. if (autoRelease && release) {
17. ReferenceCountUtil.release(msg);
18. }
19. }
20. }
21. //省略了很多代码。。。。。。
22.
23. protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
24.
25. }
出站操作和数据将由 ChannelOutboundHandler
处理,它的方法将被 Channel
、 ChannelPipeline
以及 ChannelHandlerContext
调用。
ChannelOutboundHandler
的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷并在稍后继续。
ChannelPromise与ChannelFuture :
ChannelOutboundHandler
中的大部分方法都需要一个ChannelPromise
参数,以便在操作完成时得到通知。ChannelPromise
是ChannelFuture
的一个子类,其定义了一些可写的方法,如setSuccess()
和setFailure()
,从而使ChannelFuture不可变。
ChannelOutboundHandler源码如下:
1. /**
2. * {@link ChannelHandler} which will get notified for IO-outbound-operations.
3. */
4. public interface ChannelOutboundHandler extends ChannelHandler {
5. /**
6. * Called once a bind operation is made.
7. *
8. * @param ctx the {@link ChannelHandlerContext} for which the bind operation is made
9. * @param localAddress the {@link SocketAddress} to which it should bound
10. * @param promise the {@link ChannelPromise} to notify once the operation completes
11. * @throws Exception thrown if an error occurs
12. */
13. void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
14.
15. /**
16. * Called once a connect operation is made.
17. *
18. * @param ctx the {@link ChannelHandlerContext} for which the connect operation is made
19. * @param remoteAddress the {@link SocketAddress} to which it should connect
20. * @param localAddress the {@link SocketAddress} which is used as source on connect
21. * @param promise the {@link ChannelPromise} to notify once the operation completes
22. * @throws Exception thrown if an error occurs
23. */
24. void connect(
25. ChannelHandlerContext ctx, SocketAddress remoteAddress,
26. SocketAddress localAddress, ChannelPromise promise) throws Exception;
27.
28. /**
29. * Called once a disconnect operation is made.
30. *
31. * @param ctx the {@link ChannelHandlerContext} for which the disconnect operation is made
32. * @param promise the {@link ChannelPromise} to notify once the operation completes
33. * @throws Exception thrown if an error occurs
34. */
35. void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
36.
37. /**
38. * Called once a close operation is made.
39. *
40. * @param ctx the {@link ChannelHandlerContext} for which the close operation is made
41. * @param promise the {@link ChannelPromise} to notify once the operation completes
42. * @throws Exception thrown if an error occurs
43. */
44. void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
45.
46. /**
47. * Called once a deregister operation is made from the current registered {@link EventLoop}.
48. *
49. * @param ctx the {@link ChannelHandlerContext} for which the close operation is made
50. * @param promise the {@link ChannelPromise} to notify once the operation completes
51. * @throws Exception thrown if an error occurs
52. */
53. void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
54.
55. /**
56. * Intercepts {@link ChannelHandlerContext#read()}.
57. */
58. void read(ChannelHandlerContext ctx) throws Exception;
59.
60. /**
61. * Called once a write operation is made. The write operation will write the messages through the
62. * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
63. * {@link Channel#flush()} is called
64. *
65. * @param ctx the {@link ChannelHandlerContext} for which the write operation is made
66. * @param msg the message to write
67. * @param promise the {@link ChannelPromise} to notify once the operation completes
68. * @throws Exception thrown if an error occurs
69. */
70. void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
71.
72. /**
73. * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
74. * that are pending.
75. *
76. * @param ctx the {@link ChannelHandlerContext} for which the flush operation is made
77. * @throws Exception thrown if an error occurs
78. */
79. void flush(ChannelHandlerContext ctx) throws Exception;
80. }
如果将 ChannelPipeline
视为 ChannelHandler
实例链,可拦截流经通道的入站和出站事件,即可明白 ChannelHandler
之间的交互是如何构成应用程序数据和事件处理逻辑的核心的。当创建一个新的Channel时,都会分配了一个新的 ChannelPipeline
,该关联是永久的,该通道既不能附加另一个 ChannelPipeline
也不能分离当前的 ChannelPipeline
。
一个事件要么被 ChannelInboundHander
处理,要么被 ChannelOutboundHandle
r处理,随后,它将通过调用 ChannelHandlerContext
的实现来将事件转发至同一超类型的下一个处理器。 ChannelHandlerContext
允许 ChannelHandler
与其 ChannelPipeline
和其他 ChannelHandler
进行交互,一个处理器可以通知 ChannelPipeline
中的下一个处理器,甚至可以修改器隶属于的 ChannelPipeline
。
下图展示了 ChannelHandlerPipeline
、 ChannelInboundHandler
和 ChannelOutboundHandler
之间的关系
可以看到 ChannelPipeline
是由一系列 ChannelHandlers
组成,其还提供了通过自身传播事件的方法,当进站事件触发时,其从 ChannelPipeline
的头部传递到尾部,而出站事件会从右边传递到左边。
当管道传播事件时,其会确定下一个 ChannelHandler
的类型是否与移动方向匹配,若不匹配,则会跳过并寻找下一个,直至找到相匹配的 ChannelHandler
(一个处理器可以会同时实现 ChannelInboundHandler
和 ChannelOutboundHandler
)。
ChannelHandlerContext
代表了 ChannelHandler
与 ChannelPipeline
之间的关联,当 ChannelHandler
被添加至 ChannelPipeline
中时其被创建, ChannelHandlerContext
的主要功能是管理相关 ChannelHandler
与同一 ChannelPipeline
中的其他 ChannelHandler
的交互。
ChannelHandlerContext
中存在很多方法,其中一些也存在于 ChannelHandler
和 ChannelPipeline
中,但是差别很大。如果在 ChannelHandler
或者 ChannelPipeline
中调用该方法,它们将在整个管道中传播,而如果在 ChannelHandlerContext
中调用方法,那么会仅仅传递至下个能处理该事件的 ChannelHandler
。
通过上图我们可以看到,一个 Channel
包含了一个 ChannelPipeline
,而 ChannelPipeline
中又维护了一个由 ChannelHandlerContext
组成的双向链表。这个链表的头是 HeadContext
,链表的尾是 TailContext
,并且每个 ChannelHandlerContext
中又关联着一个 ChannelHandler
。
前面已经知道了一个 Channel
的初始化的基本过程,下面再回顾一下 下面的代码是 AbstractChannel
构造器
1. protected AbstractChannel(Channel parent) {
2. this.parent = parent;
3. unsafe = newUnsafe();
4. pipeline = new DefaultChannelPipeline(this);
5. }
AbstractChannel
有一个 pipeline
字段,在构造器中会初始化它为 DefaultChannelPipeline
的实例,这里的代码就印证了一点:每个 Channel
都有一个 ChannelPipeline
主要介绍ChannelPipeline和ChannelHandler基本概念以及在netty中的作用。有点类似于加工厂的流水线 Channel
。
ChannelPipeline
相当于流水线的传送带
ChannelHandler
流水线上的每个步骤工人
ChannelHandlerContext
就是待加工的产品
这条流水线有个特点是双向的。
Netty实战.pdf https://www.cnblogs.com/leesf456/p/6901189.html https://www.jianshu.com/p/33311b4cab30 https://www.jianshu.com/u/fc9c660e9843