前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Netty】ChannelPipeline和ChannelHandler(一)

【Netty】ChannelPipeline和ChannelHandler(一)

作者头像
用户3467126
修改2019-07-03 19:42:05
6000
修改2019-07-03 19:42:05
举报
文章被收录于专栏:爱编码爱编码

简介

前文学习Netty的ByteBuf数据容器。本文开始学习ChannelPipeline和ChannelHandler,它们的角色非常类似于流水生产线。

Channel的生命周期

Channel接口定义了一个简单但是强大的状态模型,该模型与ChannelInboundHandler API紧密联系。

ChannelInboundHandler源码如下

代码语言:javascript
复制
public interface ChannelInboundHandler extends ChannelHandler {
    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
     */
     void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
      */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
    /**
      * The {@link Channel} of the {@link ChannelHandlerContext} is now active
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;
    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;
    /**
      * Invoked when the current {@link Channel} has read a message from the peer.
       */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    /**
     * Invoked when the last message read by the current read operation has been consumed by
     * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
      * attempt to read an inbound data from the current {@link Channel} will be made until
     * {@link ChannelHandlerContext#read()} is called.
     */ 
   void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    /**
     * Gets called if an user event was triggered.
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
    /**
     * Gets called once the writable state of a {@link Channel} changed. You can check the state with
     * {@link Channel#isWritable()}.
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
    /**
     * Gets called if a {@link Throwable} was thrown.
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;}

其中标记Channel生命周期状态的方法如下:

以下列出Channel的4个状态:

ChannelUnregistered:Channel已经被创建,但还未注册到EventLoopChannelRegistered:Channel已经被注册到了EventLoopChannelActive:Channel处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了ChannelInactive:Channel没有连接到远程节点

Channel的正常生命周期如下图所示

当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline中的ChannelHandler,其可以随后对它们做出响应。

ChannelHandler的生命周期

ChannelHandler被添加到 ChannelPipeline中或者被从 ChannelPipeline中移除时会调用这些操作,这些方法中的每一个都接受一个 ChannelHandlerContext参数。

ChannelHandler源码如下:

代码语言:javascript
复制
public interface ChannelHandler {
    /**
     * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
     */
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    /**
     * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
     * anymore.
     */
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    /**
     * Gets called if a {@link Throwable} was thrown.
     *
     * @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
     * implement the method there.
     */
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    /**
     * Indicates that the same instance of the annotated {@link ChannelHandler}
     * can be added to one or more {@link ChannelPipeline}s multiple times
     * without a race condition.
     * <p>
     * If this annotation is not specified, you have to create a new handler
     * instance every time you add it to a pipeline because it has unshared
     * state such as member variables.
     * <p>
     * This annotation is provided for documentation purpose, just like
     * <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
      */
    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}

ChannelHandler的生命周期方法解析:

handlerAdded:当把 ChannelHandler添加到 ChannelPipeline中时被调用handlerRemoved:当从 ChannelPipeline中移除 ChannelHandler时被调用exceptionCaught:当处理过程中在 ChannelPipeline中有错误产生时被调用

Netty定义了下面两个重要的ChannelHandler子接口:

ChannelInboundHandler——处理入站数据以及各种状态变化ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作

ChannelHandler之ChannelInboundHandler

ChannelInboundHandler接口生命周期中的方法,当接受到数据或者其对应的 Channel的状态发生变化则会调用方法

ChannelInboundHandler源码如下:

代码语言:javascript
复制
1.	public interface ChannelInboundHandler extends ChannelHandler {
2.	
3.	    /**
4.	     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
5.	     */
6.	    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
7.	
8.	    /**
9.	     * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
10.	     */
11.	    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
12.	
13.	    /**
14.	     * The {@link Channel} of the {@link ChannelHandlerContext} is now active
15.	     */
16.	    void channelActive(ChannelHandlerContext ctx) throws Exception;
17.	
18.	    /**
19.	     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
20.	     * end of lifetime.
21.	     */
22.	    void channelInactive(ChannelHandlerContext ctx) throws Exception;
23.	
24.	    /**
25.	     * Invoked when the current {@link Channel} has read a message from the peer.
26.	     */
27.	    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
28.	
29.	    /**
30.	     * Invoked when the last message read by the current read operation has been consumed by
31.	     * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
32.	     * attempt to read an inbound data from the current {@link Channel} will be made until
33.	     * {@link ChannelHandlerContext#read()} is called.
34.	     */
35.	    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
36.	
37.	    /**
38.	     * Gets called if an user event was triggered.
39.	     */
40.	    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
41.	
42.	    /**
43.	     * Gets called once the writable state of a {@link Channel} changed. You can check the state with
44.	     * {@link Channel#isWritable()}.
45.	     */
46.	    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
47.	
48.	    /**
49.	     * Gets called if a {@link Throwable} was thrown.
50.	     */
51.	    @Override
52.	    @SuppressWarnings("deprecation")
53.	    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
54.	}

当某个 ChannelInboundHandler的实现重写 channelRead()方法时,它将负责显式地释放与池化 ByteBuf实例相关的内存,Netty为此提供了一个实用方法 ReferenceCountUtil.release()

代码语言:javascript
复制
1.	@ChannelHandler.Sharable
2.	//扩展了ChannelInboundHandlerAdapter
3.	public class DiscardHandler extends ChannelInboundHandlerAdapter{
4.	
5.	    @Override
6.	    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
7.	        //丢弃已接收的消息
8.	        ReferenceCountUtil.release(msg);
9.	    }
10.	}

当你不想处理这些释放资源等操作的话,Netty提供了SimpleChannelInboundHandler会自动释放资源,因此无需显式释放,代码如下:

代码语言:javascript
复制
1.	public class MySimpleHandler
2.	    extends SimpleChannelInboundHandler<Object> {
3.	    @Override
4.	    public void channelRead0(ChannelHandlerContext ctx,
5.	        Object msg) {
6.	            // No need to do anything special
7.	    }
8.	}

为了更好地理解,我们看看SimpleChannelInboundHandler的源码,从中可以看到,它已经帮我们释放资源 了的,我们只需要实现channelRead0方法,在channelRead0()处理我们的业务逻辑即可。

代码语言:javascript
复制
1.	public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
2.	//省略了很多代码。。。。。。
3.	  @Override
4.	    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
5.	        boolean release = true;
6.	        try {
7.	            if (acceptInboundMessage(msg)) {
8.	                @SuppressWarnings("unchecked")
9.	                I imsg = (I) msg;
10.	                channelRead0(ctx, imsg);
11.	            } else {
12.	                release = false;
13.	                ctx.fireChannelRead(msg);
14.	            }
15.	        } finally {
16.	            if (autoRelease && release) {
17.	                ReferenceCountUtil.release(msg);
18.	            }
19.	        }
20.	    }
21.	//省略了很多代码。。。。。。
22.	
23.	    protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
24.	
25.	}

ChannelHandler之ChannelOutboundHandler

出站操作和数据将由 ChannelOutboundHandler处理,它的方法将被 ChannelChannelPipeline以及 ChannelHandlerContext调用。

ChannelOutboundHandler的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷并在稍后继续。

ChannelPromise与ChannelFuture : ChannelOutboundHandler中的大部分方法都需要一个 ChannelPromise参数,以便在操作完成时得到通知。 ChannelPromiseChannelFuture的一个子类,其定义了一些可写的方法,如 setSuccess()setFailure(),从而使ChannelFuture不可变。

ChannelOutboundHandler源码如下:

代码语言:javascript
复制
1.	/**
2.	 * {@link ChannelHandler} which will get notified for IO-outbound-operations.
3.	 */
4.	public interface ChannelOutboundHandler extends ChannelHandler {
5.	    /**
6.	     * Called once a bind operation is made.
7.	     *
8.	     * @param ctx           the {@link ChannelHandlerContext} for which the bind operation is made
9.	     * @param localAddress  the {@link SocketAddress} to which it should bound
10.	     * @param promise       the {@link ChannelPromise} to notify once the operation completes
11.	     * @throws Exception    thrown if an error occurs
12.	     */
13.	    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
14.	
15.	    /**
16.	     * Called once a connect operation is made.
17.	     *
18.	     * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
19.	     * @param remoteAddress     the {@link SocketAddress} to which it should connect
20.	     * @param localAddress      the {@link SocketAddress} which is used as source on connect
21.	     * @param promise           the {@link ChannelPromise} to notify once the operation completes
22.	     * @throws Exception        thrown if an error occurs
23.	     */
24.	    void connect(
25.	            ChannelHandlerContext ctx, SocketAddress remoteAddress,
26.	            SocketAddress localAddress, ChannelPromise promise) throws Exception;
27.	
28.	    /**
29.	     * Called once a disconnect operation is made.
30.	     *
31.	     * @param ctx               the {@link ChannelHandlerContext} for which the disconnect operation is made
32.	     * @param promise           the {@link ChannelPromise} to notify once the operation completes
33.	     * @throws Exception        thrown if an error occurs
34.	     */
35.	    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
36.	
37.	    /**
38.	     * Called once a close operation is made.
39.	     *
40.	     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
41.	     * @param promise           the {@link ChannelPromise} to notify once the operation completes
42.	     * @throws Exception        thrown if an error occurs
43.	     */
44.	    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
45.	
46.	    /**
47.	     * Called once a deregister operation is made from the current registered {@link EventLoop}.
48.	     *
49.	     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
50.	     * @param promise           the {@link ChannelPromise} to notify once the operation completes
51.	     * @throws Exception        thrown if an error occurs
52.	     */
53.	    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
54.	
55.	    /**
56.	     * Intercepts {@link ChannelHandlerContext#read()}.
57.	     */
58.	    void read(ChannelHandlerContext ctx) throws Exception;
59.	
60.	    /**
61.	    * Called once a write operation is made. The write operation will write the messages through the
62.	     * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
63.	     * {@link Channel#flush()} is called
64.	     *
65.	     * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
66.	     * @param msg               the message to write
67.	     * @param promise           the {@link ChannelPromise} to notify once the operation completes
68.	     * @throws Exception        thrown if an error occurs
69.	     */
70.	    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
71.	
72.	    /**
73.	     * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
74.	     * that are pending.
75.	     *
76.	     * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
77.	     * @throws Exception        thrown if an error occurs
78.	     */
79.	    void flush(ChannelHandlerContext ctx) throws Exception;
80.	}

ChannelPipeline接口

如果将 ChannelPipeline视为 ChannelHandler实例链,可拦截流经通道的入站和出站事件,即可明白 ChannelHandler之间的交互是如何构成应用程序数据和事件处理逻辑的核心的。当创建一个新的Channel时,都会分配了一个新的 ChannelPipeline,该关联是永久的,该通道既不能附加另一个 ChannelPipeline也不能分离当前的 ChannelPipeline

  一个事件要么被 ChannelInboundHander处理,要么被 ChannelOutboundHandler处理,随后,它将通过调用 ChannelHandlerContext的实现来将事件转发至同一超类型的下一个处理器。 ChannelHandlerContext允许 ChannelHandler与其 ChannelPipeline和其他 ChannelHandler进行交互,一个处理器可以通知 ChannelPipeline中的下一个处理器,甚至可以修改器隶属于的 ChannelPipeline

 下图展示了 ChannelHandlerPipelineChannelInboundHandlerChannelOutboundHandler之间的关系

  可以看到 ChannelPipeline是由一系列 ChannelHandlers组成,其还提供了通过自身传播事件的方法,当进站事件触发时,其从 ChannelPipeline的头部传递到尾部,而出站事件会从右边传递到左边。

  当管道传播事件时,其会确定下一个 ChannelHandler的类型是否与移动方向匹配,若不匹配,则会跳过并寻找下一个,直至找到相匹配的 ChannelHandler(一个处理器可以会同时实现 ChannelInboundHandlerChannelOutboundHandler)。

ChannelHandlerContext接口

ChannelHandlerContext代表了 ChannelHandlerChannelPipeline之间的关联,当 ChannelHandler被添加至 ChannelPipeline中时其被创建, ChannelHandlerContext的主要功能是管理相关 ChannelHandler与同一 ChannelPipeline中的其他 ChannelHandler的交互。

ChannelHandlerContext中存在很多方法,其中一些也存在于 ChannelHandlerChannelPipeline中,但是差别很大。如果在 ChannelHandler或者 ChannelPipeline中调用该方法,它们将在整个管道中传播,而如果在 ChannelHandlerContext中调用方法,那么会仅仅传递至下个能处理该事件的 ChannelHandler

问题:ChannelPipeline在哪里创建的呢?

通过上图我们可以看到,一个 Channel包含了一个 ChannelPipeline,而 ChannelPipeline中又维护了一个由 ChannelHandlerContext组成的双向链表。这个链表的头是 HeadContext,链表的尾是 TailContext,并且每个 ChannelHandlerContext中又关联着一个 ChannelHandler

前面已经知道了一个 Channel的初始化的基本过程,下面再回顾一下 下面的代码是 AbstractChannel构造器

代码语言:javascript
复制
1.	protected AbstractChannel(Channel parent) {
2.	    this.parent = parent;
3.	    unsafe = newUnsafe();
4.	    pipeline = new DefaultChannelPipeline(this);
5.	}

AbstractChannel有一个 pipeline字段,在构造器中会初始化它为 DefaultChannelPipeline的实例,这里的代码就印证了一点:每个 Channel都有一个 ChannelPipeline

总结

主要介绍ChannelPipeline和ChannelHandler基本概念以及在netty中的作用。有点类似于加工厂的流水线 Channel

ChannelPipeline相当于流水线的传送带

ChannelHandler流水线上的每个步骤工人

ChannelHandlerContext就是待加工的产品

这条流水线有个特点是双向的

参考文章

Netty实战.pdf https://www.cnblogs.com/leesf456/p/6901189.html https://www.jianshu.com/p/33311b4cab30 https://www.jianshu.com/u/fc9c660e9843

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 爱编码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Channel的生命周期
  • ChannelHandler的生命周期
    • ChannelHandler之ChannelInboundHandler
      • ChannelHandler之ChannelOutboundHandler
      • ChannelPipeline接口
      • ChannelHandlerContext接口
      • 问题:ChannelPipeline在哪里创建的呢?
      • 总结
        • 参考文章
        相关产品与服务
        容器服务
        腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档