前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty ChannelHandler与ChannelPipeline源码解读

Netty ChannelHandler与ChannelPipeline源码解读

作者头像
良辰美景TT
发布2018-09-11 14:26:30
4880
发布2018-09-11 14:26:30
举报

ChannelHandler

  ChannelHandler基本上是我们第一次接触Netty就会碰到的对象,我们自定义的各种ChannelHandler主要用于处理我们系统的各种业务逻辑,比如发生了active事件后的处理逻辑,发生了读事件的处理逻辑,下面先来看一下ChannelHandler的类继承图:

image.png

  ChannelHandler被分为两部分,分别为ChannelOutboundHandler与ChannelInboundHandler。其中ChannelInboundHandler提供了一些方法再接收数据或Channel状态改变时被调用,ChannelOutboundHandler则提供了与网络I/O相关的方法。 同时Netty也提供了相应的Adapter,主要是为了我们编码的方便,我们可以通过继承Adapter,这样ChannelHandler里便只需要关注需要重写的方法。而不是实现所有接口的方法。

StringDecoder源码

  我们来关注一下StringDecoder这个类,StringDecoder用于对读入的数据根据指定的字符编码进行转换。StringDecoder继承MessageToMessageDecoder,而MessageToMessageDecoder继承ChannelInboundHandlerAdapter。StringDecoder便是一个典型的ChannelInboundHandler啦,先来看看MessageToMessageDecoder里都有那些内容,源码如下:

代码语言:javascript
复制
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {

//matcher用于检验是否对msg进行Decoder
    private final TypeParameterMatcher matcher;

    /**
     * Create a new instance which will try to detect the types to match out of the type parameter of the class.
     */
    protected MessageToMessageDecoder() {
        matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
    }

    /**
     * Create a new instance
     *
     * @param inboundMessageType    The type of messages to match and so decode
     */
    protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
        matcher = TypeParameterMatcher.get(inboundMessageType);
    }

    /**
     * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
     * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     */
    public boolean acceptInboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

//可以看出MessageToMessageDecoder只对 channelRead进行了重写,这就是Adapter提供的好处
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//这里的out是个list对象
        CodecOutputList out = CodecOutputList.newInstance();
        try {
//acceptInboundMessage判断是否对msg进行解析
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
//这是个留给子类实现的方法啦, 也就是我们的StringDecoder里会实现的方法啦
                    decode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
            } else {
                out.add(msg);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            int size = out.size();
//对out里的对象触发fireChannelRead,让其它的channelhandler处理
            for (int i = 0; i < size; i ++) {
                ctx.fireChannelRead(out.getUnsafe(i));
            }
            out.recycle();
        }
    }


    protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

MessageToMessageDecoder方法只做了两件事:1:判断当前个对象是否需要调用decode方法,2:将decode结果的对象调用fireChannelRead方法交给其它的ChannelHandler处理。StringDecoder类里的方法就更简单了,源码如下:

代码语言:javascript
复制
@Sharable
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {

    // TODO Use CharsetDecoder instead.
//传入字节码
    private final Charset charset;

    /**
     * Creates a new instance with the current system character set.
     */
    public StringDecoder() {
        this(Charset.defaultCharset());
    }

    /**
     * Creates a new instance with the specified character set.
     */
    public StringDecoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//这里对msg进行处理
        out.add(msg.toString(charset));
    }
}

StringEncoder源码

  我们再来关注一下StringEncoder的处理流程,StringEncoder用于对需要写的数据进行字符编码,StringEncoder继承自MessageToMessageEncoder,而MessageToMessageEncoder又继承ChannelOutboundHandlerAdapter。下面是MessageToMessageEncoder的源码:

代码语言:javascript
复制
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {

    private final TypeParameterMatcher matcher;

    /**
     * Create a new instance which will try to detect the types to match out of the type parameter of the class.
     */
    protected MessageToMessageEncoder() {
        matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
    }

    /**
     * Create a new instance
     *
     * @param outboundMessageType   The type of messages to match and so encode
     */
    protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {
        matcher = TypeParameterMatcher.get(outboundMessageType);
    }

    /**
     * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
     * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     */
    public boolean acceptOutboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }

//只需要关注这个方法啦,这里会对面要写的数据进行encode
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        CodecOutputList out = null;
        try {
//跟上面decode一样,需要验证msg能不能处理
            if (acceptOutboundMessage(msg)) {
                out = CodecOutputList.newInstance();
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
//具体的encode留给子类处理
                    encode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (out.isEmpty()) {
                    out.recycle();
                    out = null;
                    throw new EncoderException(
                            StringUtil.simpleClassName(this) + " must produce at least one message.");
                }
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new EncoderException(t);
        } finally {
//out不为空的话,就会调用ctx的witer方法触发写数据的逻辑啦
            if (out != null) {
                final int sizeMinusOne = out.size() - 1;
                if (sizeMinusOne == 0) {
                    ctx.write(out.get(0), promise);
                } else if (sizeMinusOne > 0) {
                    // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
                    // See https://github.com/netty/netty/issues/2525
                    ChannelPromise voidPromise = ctx.voidPromise();
                    boolean isVoidPromise = promise == voidPromise;
                    for (int i = 0; i < sizeMinusOne; i ++) {
                        ChannelPromise p;
                        if (isVoidPromise) {
                            p = voidPromise;
                        } else {
                            p = ctx.newPromise();
                        }
                        ctx.write(out.getUnsafe(i), p);
                    }
                    ctx.write(out.getUnsafe(sizeMinusOne), promise);
                }
                out.recycle();
            }
        }
    }

   
    protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

  MessageToMessageEncoder类里也只做了三件事:1:判断当前的对象是否需要进行encoder。2:调用子类encoder方法对对象进行encoder。3:将encoder好了的对象调用发送逻辑。下面是StringEncoder源码:

代码语言:javascript
复制
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {

    // TODO Use CharsetEncoder instead.
    private final Charset charset;

    /**
     * Creates a new instance with the current system character set.
     */
    public StringEncoder() {
        this(Charset.defaultCharset());
    }

    /**
     * Creates a new instance with the specified character set.
     */
    public StringEncoder(Charset charset) {
        if (charset == null) {
            throw new NullPointerException("charset");
        }
        this.charset = charset;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
        if (msg.length() == 0) {
            return;
        }
//根据 charset将String转成ByteBuf对象
        out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
    }
}

ChannelPipeline

  ChannelPipeline用于组织ChannelHandlerContext(内部含有ChannelHandler),在Netty里采用的是双端链表的方式来管理ChannelHandlerContext。在ChannelPipeline里提供了各种对双端链表处理的方法,同时也提供了各种触发ChannelHandlerContext的方法,比如:fireChannelActive方法,下面是部分源码:

代码语言:javascript
复制
public class DefaultChannelPipeline implements ChannelPipeline {

//双端链表的head对象
    final AbstractChannelHandlerContext head;
//双端链表的tail对象
    final AbstractChannelHandlerContext tail;
//持用的channel对象
    private final Channel channel;

    private final ChannelFuture succeededFuture;
    private final VoidChannelPromise voidPromise;
    private final boolean touch = ResourceLeakDetector.isEnabled();

    private Map<EventExecutorGroup, EventExecutor> childExecutors;
    private volatile MessageSizeEstimator.Handle estimatorHandle;
    private boolean firstRegistration = true;

//的链表的未位增加一个ChannelHandler 
    public final ChannelPipeline addLast(ChannelHandler handler) {
        return addLast(null, handler);
    }

//的链表的未位增加一个ChannelHandler ,需要传入这个ChannelHandler的名称
    @Override
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }

//最终会调用到这个方法来对channelHandler处理
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
//这是一个同步方法,需要锁住这个pipeline对象
        synchronized (this) {
//参数合法性验证
            checkMultiplicity(handler);
//这里会将ChannelHandler 包装成ChannelHandlerContext对象,这也就是为什么双端链表里存的是ChannelHandlerContext啦其中filterName会对为null的name生成一个名称
            newCtx = newContext(group, filterName(name, handler), handler);
//这里才是具体处理链表的方法啦
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.

            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
//下面的方法是对链表进行操作的代码
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

//pipeline里提供了类似fireChannelActive方法,这些方法最络会调用到channelHandler对应的方法上
    @Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
}

ChannelHandlerContext

  ChannelHandlerContext对于连接ChannelHandler与ChannelPipeline。 ChannelHandlerContext内部持有ChannelHandler对象,同时又是ChannelPipeline链表里的节点,串起了ChannelPipeline的整个逻辑,下面来看看ChannelHandlerContext最重要的类AbstractChannelHandlerContext源码:

代码语言:javascript
复制
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
//当前ChannelHandlerContext指向的下一个ChannelHandlerContext
    volatile AbstractChannelHandlerContext next;
//当前ChannelHandlerContext指向的前一个ChannelHandlerContext
    volatile AbstractChannelHandlerContext prev;
//用于标识channelHanlder是否为inbound
    private final boolean inbound;
//用于标识channelHanlder是否为outbound
    private final boolean outbound;
//同时也持胡pipeline对象
    private final DefaultChannelPipeline pipeline;
//channelHandler取的名称
    private final String name;
//是否需要排序
    private final boolean ordered;

//构造方法如下
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

//这个方法是个static方法,用于给pipeline对象调用,
    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
//会触发ChannelHandlerContext的invokeChannelActive方法
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }

//active的逻辑会调用到这个方法里
    private void invokeChannelActive() {
//确认当前channelhandler的状态 
        if (invokeHandler()) {
            try {
//最络会调用到channelhandler的channelActive方法,其中handler()方法是留给子类实现的可以看DefaultChannelHandlerContext源码部分
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }

}

DefaultChannelHandlerContext源码就很简单了,提供了一个handler方法用于得到当前的ChannelHandler和判断当前ChannelHandler的类型。代码如下:

代码语言:javascript
复制
package io.netty.channel;

import io.netty.util.concurrent.EventExecutor;

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
}

ChannelHandlerContext里作为ChannelPipeline的链表节点,决定着事件是否进行向下流转,如果想让事件向下流转,只需要通过ChannelHandlerContext调用相应的fire方法就行了

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.07.24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ChannelHandler
  • StringDecoder源码
  • StringEncoder源码
  • ChannelPipeline
  • ChannelHandlerContext
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档