前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty的ChannelPipline传播源码解析

Netty的ChannelPipline传播源码解析

作者头像
止术
发布2021-08-06 14:29:09
5200
发布2021-08-06 14:29:09
举报
文章被收录于专栏:求道求道

上一章节 ,我们基本讲述了Netty对于新连接接入与管道数据处理的源码解析,大家可能发现,在我们前面学习中,涉及到了很多的有关pipeline的操作,在前面介绍这些的时候,我为了保证主线逻辑的清晰,基本都是一概而过,本章节将对pipeline做一个详细的讲解!

一、基础铺垫

1. JAVA中的基本位运算符

运算符

描述

&

|

~

^

异或

<<

左移

>>

右移

2. 位运算解释与实例

&(与)

十进制

二进制

3

0 0 1 1

5

0 1 0 1

& 后结果:1

0 0 0 1

即:对应位都为 1 时,才为 1,否则全为 0。

|(或)

十进制

二进制

3

0 0 1 1

5

0 1 0 1

| 后结果 :7

0 1 1 1

即:对应位只要有 1 时,即为 1,否则全为 0。

~(非)

十进制

二进制

3

0 0 1 1

~ 后结果:12

1 1 0 0

即:对应位取反。

异或 ^

十进制

二进制

3

0 0 1 1

5

0 1 0 1

^ 后结果:6

0 1 1 0

即:只要对应为不同即为 1。

3. 配合Netty实例

我们在以往学习Netty中见到过类似于以下代码:

代码语言:javascript
复制
selectionKey.interestOps(interestOps | readInterestOp);

我们重点关注位运算:interestOps | readInterestOp

该行代码的意思是位运算计算一个数字,该数字包含 | 前后的数字!

代码语言:javascript
复制
//初始化一个值
int interestOps = 0;
//给当前这个值增加一个可读事件
interestOps |= OP_READ;
//给当前的值增加一个可写的事件
interestOps |= OP_WRITE;
//判断当前的事件是不是包含可读事件 true
boolean isRead = (interestOps & OP_READ) == OP_READ;
//判断当前的事件是不是不包含可读事件 false
boolean isRead = (interestOps & OP_READ) == 0;
//剔除可读事件
interestOps &= ~OP_READ;
//剔除可写事件
interestOps &= ~OP_WRITE;

二、源码解析

1. 创建管道

io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline

代码语言:javascript
复制
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
	//创建一个管道上下文  尾部节点
    tail = new TailContext(this);
    //创建一个管道上下文  头部节点
    head = new HeadContext(this);
	//头部节点的下一个节点设置为尾部节点
    head.next = tail;
    //尾部节点的上一个节点设置为头部节点
    tail.prev = head;
}

可以看到,这里初始化管道的时候,管道内部存在两个Handler tail和head节点,两个节点组成双向链表!

image-20210505233350257

2. 向通道内添加一个Handler处理器

代码语言:javascript
复制
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
        super.channelRegistered(ctx);
    }
});

上述代码再一个Netty开发中是很常见的一个代码,这里向通道内添加了一个 ChannelInboundHandlerAdapter,我们进入到addLast方法:

代码语言:javascript
复制
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

//进入到 addLast
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    ObjectUtil.checkNotNull(handlers, "handlers");

    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h);
    }

    return this;
}

//进入到 addLast(executor, null, h);
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        //验证是否重复添加改handler
        checkMultiplicity(handler);
		//将handler封装为上下文对象
        newCtx = newContext(group, filterName(name, handler), handler);
        //将该节点添加到双向链表中
        addLast0(newCtx);
        ........................忽略其他代码..............
    }
    ........................忽略其他代码..............
    return this;
}

这里总共分为两步:

验证Handler是否被重复添加

代码语言:javascript
复制
checkMultiplicity(handler);
代码语言:javascript
复制
private static void checkMultiplicity(ChannelHandler handler) {
    //验证是不是 ChannelHandlerAdapter 类型的,如果不是直接忽略
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        //如果不是可共享的而且是已经添加过的直接报错
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                h.getClass().getName() +
                " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        //如果是可共享的或者未添加的,将该handler内的 added属性设置为true证明该handler已经被添加
        h.added = true;
    }
}

他是如何判断是否被添加过的呢?

每一个Handler中都存在一个 added属性,当这个属性为true的时候,证明这个Handler已经被添加过了,Netty常规情况下为了考虑线程安全问题,是不允许一个Handler被重复的使用的!

但是我们有时候会有这样一个需求,Handler的功能比较类似,而且我们通过代码手段,避免了线程安全问题,所以又想重复添加Handler,Netty提供了一个注解 @Sharable注解,当存在该注解的时候,证明这个Handler是可以被复用的,可以被重复添加!

所以,checkMultiplicity方法通过判断类是否增加了@Sharable注解和added属性是否为空来验证Handle是否违规重复添加了!

当验证通过之后,将added设置为true,证明这个Handler已经被添加过了!

将Handler封装为包装对象

代码语言:javascript
复制
newCtx = newContext(group, filterName(name, handler), handler);

这里比较难理解的就是这个,我们进入到newContext方法里面:

代码语言:javascript
复制
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

进入到 DefaultChannelHandlerContext类的源码里面:

代码语言:javascript
复制
DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    //调用父类进行掩码计算
    super(pipeline, executor, name, handler.getClass());
    //保存一个handler
    this.handler = handler;
}

这里除了会保存一个handler还会调用父类,我们介入到父类里面:

代码语言:javascript
复制
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                                  String name, Class<? extends ChannelHandler> handlerClass) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    //标识 是in还是out
    this.executionMask = mask(handlerClass);
    // 如果由EventLoop或给定的Executor驱动的驱动程序是OrderedEventExecutor的实例,则其顺序为。
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

这里会保存一些属性,这些属性都是我们前面讲过的,大家自行分析下,我们重点关注掩码的计算:

代码语言:javascript
复制
this.executionMask = mask(handlerClass);
代码语言:javascript
复制
static int mask(Class<? extends ChannelHandler> clazz) {
    //直接再缓存中取出
    Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
    Integer mask = cache.get(clazz);
    //缓存中不存在
    if (mask == null) {
        mask = mask0(clazz);
        cache.put(clazz, mask);
    }
    return mask;
}

先从缓存中取出,如果不存在就调用 mask0(clazz); 方法计算,然后再放进缓存,我们进入到mask0(clazz);方法:

代码语言:javascript
复制
private static int mask0(Class<? extends ChannelHandler> handlerType) {
        int mask = MASK_EXCEPTION_CAUGHT;
        try {
            if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
                // 如果是 ChannelInboundHandler 实例,所有 Inbound 事件置为 1
                mask |= MASK_ALL_INBOUND;
                //判断是否存在Skip注解   如果催你在这个跳过的注解  就移除这个
                if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_REGISTERED;
                }
                ..................忽略类似的代码.....................
            }

            if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
                mask |= MASK_ALL_OUTBOUND;

                if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                        SocketAddress.class, ChannelPromise.class)) {
                    mask &= ~MASK_BIND;
                }
                ..................忽略类似的代码.....................
            }
        } catch (Exception e) {
            ..................忽略异常的代码.....................
        }

        return mask;
    }

这会区分两种情况,一种是ChannelInboundHandler类型的,一种是ChannelOutboundHandler类型的,二者逻辑相同,我们以ChannelInboundHandler为例:

首先,再ChannelHandlerMask类里面定义了很多的预设掩码值:

代码语言:javascript
复制
/**
     * 以下是方法代表的掩码值
     */
    static final int MASK_EXCEPTION_CAUGHT = 1;
    /**
     * channelRegistered方法的掩码
     */
    static final int MASK_CHANNEL_REGISTERED = 1 << 1;
    /**
     * channelUnregistered方法的掩码
     */
    static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
	/**
	* 后面的以此类推
	*/
    static final int MASK_CHANNEL_ACTIVE = 1 << 3;
    static final int MASK_CHANNEL_INACTIVE = 1 << 4;
    static final int MASK_CHANNEL_READ = 1 << 5;
    static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
    static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
    static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
    /**
     * bind方法的掩码
     */
    static final int MASK_BIND = 1 << 9;
    /**
     * connect方法的掩码
     */
    static final int MASK_CONNECT = 1 << 10;
	/**
	* 后面的以此类推
	*/
    static final int MASK_DISCONNECT = 1 << 11;
    static final int MASK_CLOSE = 1 << 12;
    static final int MASK_DEREGISTER = 1 << 13;
    static final int MASK_READ = 1 << 14;
    static final int MASK_WRITE = 1 << 15;
    static final int MASK_FLUSH = 1 << 16;

    /**
     * 包含全部 Inbound方法的掩码
     */
    private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED |
            MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
            MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;

    /**
     * 包含全部 outbound方法的掩码
     */
    private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
            MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;

我们回到 mask0方法:

代码语言:javascript
复制
mask |= MASK_ALL_INBOUND;

一开始,我们会直接将一个handler的掩码计算为拥有全部方法的掩码!

代码语言:javascript
复制
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
    mask &= ~MASK_CHANNEL_REGISTERED;
}

判断该方法是否存在 @Skip注解,如果存在就排除掉这个掩码!

整个逻辑执行完毕后,这个掩码就只会包含handler中没有被@Sikp注解注解的方法掩码!

有同学可能疑问,我在书写handler的时候并没有增加@Sikp注解呀! 我们都知道,实现一个Handler就必定需要继承 ChannelInboundHandlerAdapter或者ChannelOutboundHandlerAdapter, 我们随便挑一个类进去看:

image-20210506002314359

可以看到,这些方法其实都是被默认添加了的,只不过我们重写之后没添加!现在我们明白,handler是如何区分你实现了那些方法的了!

这里会将handler包装为HandlerContext对象,类似于tailContext和HeadContext一样,此时上下文对象的结构如下:

image-20210506002420080

将HandlerContext添加进pipeline中:

代码语言:javascript
复制
addLast0(newCtx);
代码语言:javascript
复制
private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

image-20210506000354375

整个过程如上,无非就是指针指向地址的变换,比较简单,不做深入分析!

3. 删除一个处理器

代码语言:javascript
复制
ch.pipeline().remove("xxxxxx")
代码语言:javascript
复制
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    return this;
}

寻找处理器Handler的上下文

代码语言:javascript
复制
getContextOrDie(handler)
代码语言:javascript
复制
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    //寻找handler
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    if (ctx == null) {
        throw new NoSuchElementException(handler.getClass().getName());
    } else {
        return ctx;
    }
}

//context(handler);
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
    ObjectUtil.checkNotNull(handler, "handler");

    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {

        if (ctx == null) {
            return null;
        }
		//循环迭代 判断是否寻找到这个handler
        if (ctx.handler() == handler) {
            //返回这个handler的上下文对象
            return ctx;
        }

        ctx = ctx.next;
    }
}

删除这个处理器

代码语言:javascript
复制
remove(getContextOrDie(handler));
代码语言:javascript
复制
private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
    if (ctx == null) {
        return null;
    }
    return (T) remove((AbstractChannelHandlerContext) ctx).handler();
}

//直接进入到  删除Handler的主要逻辑
//(T) remove((AbstractChannelHandlerContext) ctx).handler();
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    //首先删除的handler不是tail和尾节点
    assert ctx != head && ctx != tail;

    synchronized (this) {
        //删除上下文对象
        atomicRemoveFromHandlerList(ctx);
		................忽略....................

        EventExecutor executor = ctx.executor();
        if (!executor.inEventLoop()) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    //回调handlerRemoved方法
                    callHandlerRemoved0(ctx);
                }
            });
            return ctx;
        }
    }
    callHandlerRemoved0(ctx);
    return ctx;
}

首先我们关注 atomicRemoveFromHandlerList(ctx);

代码语言:javascript
复制
private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
    //获取该节点的上级节点
    AbstractChannelHandlerContext prev = ctx.prev;
    //获取该节点的下级节点
    AbstractChannelHandlerContext next = ctx.next;
    //重建指针位置
    prev.next = next;
    next.prev = prev;
}

指针位置重建之后,我们回调handlerRemoved方法

代码语言:javascript
复制
callHandlerRemoved0(ctx);

至此我们就完成了pipeline的创建、添加、删除的源码解析!

4. 管道事件传播

我们前面见到过很多的事件传播代码,我们以 channelRegistered 方法的事件回调为例:

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

代码语言:javascript
复制
//通知管道  传播channelRegistered事件
// 触发 channelRegistered 事件
pipeline.fireChannelRegistered();

我们进入到改行代码的源码:

代码语言:javascript
复制
@Override
public final ChannelPipeline fireChannelRegistered() {
    //执行注册方法  从head方法
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}
代码语言:javascript
复制
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

我们可以看到,这里使用了 next.invokeChannelRegistered();方法 我们依旧按照同步方法进行分析!

代码语言:javascript
复制
private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            //现在调用的HeadContext的handler
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();
    }
}

我们现在进入到了headContext,所以我们进入到: io.netty.channel.DefaultChannelPipeline.HeadContext#channelRegistered

代码语言:javascript
复制
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
    invokeHandlerAddedIfNeeded();
    //向下传播事件
    ctx.fireChannelRegistered();
}

这一段代码除了执行Head的invokeHandlerAddedIfNeeded方法之外,还又一次传播了channelRegistered事件,我们进入到 ctx.fireChannelRegistered();:

代码语言:javascript
复制
@Override
public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
    return this;
}

我们如果想要向下传播,我们首先应该找到下一个节点是谁才能传播,Netty这里调用了findContextInbound(MASK_CHANNEL_REGISTERED)查找下一个节点,我我们先关注以下参数 MASK_CHANNEL_REGISTERED, 他是channelRegistered方法的掩码, 我们进入到 findContextInbound方法源码:

代码语言:javascript
复制
private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    do {
        //获取下一个inbun事件
        ctx = ctx.next;
        //只要和掩码&运算后不为0的都是 inbunt事件
    } while ((ctx.executionMask & mask) == 0);
    return ctx;
}

从当前节点向下寻找,只要 掩码计算包含这个方法,就证明该context包含channelRegistered方法,就直接返回!

寻找到了handler之后,就开始调用了:

代码语言:javascript
复制
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
代码语言:javascript
复制
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}
代码语言:javascript
复制
next.invokeChannelRegistered();

具体逻辑就和上面分析的一致了,调用该handler的ChannelRegistered方法!

传播某一个事件,就会使用哪个事件的掩码,从当前节点向下寻找,知道对应的Handler之后,回调对应的方法!

关于管道的传播,你明白了吗?

提一个问题, 观察以下两种传播方式有何不同:

代码语言:javascript
复制
ctx.fireChannelRegistered();
ctx.pipeline().fireChannelRegistered();

才疏学浅,如果文章中理解有误,欢迎大佬们私聊指正!欢迎关注作者的公众号,一起进步,一起学习!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-07-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 源码学徒 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、基础铺垫
    • 1. JAVA中的基本位运算符
      • 2. 位运算解释与实例
        • 3. 配合Netty实例
        • 二、源码解析
          • 1. 创建管道
            • 2. 向通道内添加一个Handler处理器
              • 3. 删除一个处理器
                • 4. 管道事件传播
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档