前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >第十九节 netty源码分析之 pipleline和handler以及pipeline的数据流向01

第十九节 netty源码分析之 pipleline和handler以及pipeline的数据流向01

作者头像
用户1418372
发布2019-03-04 15:03:08
4690
发布2019-03-04 15:03:08
举报
文章被收录于专栏:清晨我上码清晨我上码

pipleline和handler以及pipeline的数据流向

  • 先理一下思路。首先我们考虑到之前的文章分析,没创建一个channel就会创建一个pipeline与之对应。每个pipeline会有AbstractChannelHandlerContext属性的tail和head从而组成要给双向链表。那么pipeline的handler添加和数据流向其实都是基于HandlerContext和双向链表的性质。下面具体分析。

当然我们仍然下面这段代码分析,主要分析pipeline的添加

代码语言:javascript
复制
b.group(group)
            //初始化工厂ReflectiveChannelFactory为后续链接connect方法创建NioSocketChannel对象
             .channel(NioSocketChannel.class)
                    //将选项添加到AbstractBootstrap属性options. 实现类中Bootstrap的init(Channel channel)方法设置channel的类型
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });

pipeline.addXXX 都有一个重载的方法, 例如 addLast, 它有一个重载的版本 直接查看DefaultChannelPipeline源码:

代码语言:javascript
复制
 public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

我们一路查看下去,找到重载的方法,且记住我们入参里group、和name都是null

代码语言:javascript
复制
 @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //检查是否重复添加
            checkMultiplicity(handler);
//创建DefaultChannelHandlerContext对象
            newCtx = newContext(group, filterName(name, handler), handler);
//            将生成的newCtx插入handlercontex链表中
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

从addLast0方法看到,这里是将我们的handler添加到了tail的前面

代码语言:javascript
复制
private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    ```
* 还有一点就是上面addLast方法中newContext方法的入参filterName(name, handler),这里会生成handler名字并校验是否重复(有兴趣可查看源码类DefaultChannelPipeline中  generateName(ChannelHandler handler))

2、 那么inbond和outbond是决定pipleline的数据流向的关键。
记得我们上面的newContext方法中创建的DefaultChannelHandlerContext里的构造器,有isInbound和isOutbound俩方法分别根据接口来判断Inbound和Outbound
```java
 DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

一个Inbound事件通常由Inbound handler来处理。一个Inbound handler通常处理在IO线程产生的Inbound数据。Inbound数据通过真实的输入操作如 SocketChannel#read(ByteBuffer)来获取。如果一个inbound事件越过了最上面的inbound handler,该事件将会被抛弃到而不会通知你 一个outbound事件由outbound handler来处理。一个outbound handler通常由outbound流量如写请求产生或者转变的。如果一个outbound事件越过了底部的outbound handler,它将由channel关联的IO线程处理。IO线程通常运行的是真实的输出操作如 SocketChannel#write(byteBuffer).

代码语言:javascript
复制
 inbound 事件传播方法:
ChannelHandlerContext#fireChannelRegistered()
 ChannelHandlerContext#fireChannelActive()
  ChannelHandlerContext#fireChannelRead(Object) 
  ChannelHandlerContext#fireChannelReadComplete() 
  ChannelHandlerContext#fireExceptionCaught(Throwable) 
  ChannelHandlerContext#fireUserEventTriggered(Object) 
  ChannelHandlerContext#fireChannelWritabilityChanged() 
  ChannelHandlerContext#fireChannelInactive() 
  ChannelHandlerContext#fireChannelUnregistered()
outbound事件传播方法:
ChannelHandlerContext#bind(SocketAddress, ChannelPromise) 
ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise) 
ChannelHandlerContext#write(Object, ChannelPromise) 
ChannelHandlerContext#flush() 
ChannelHandlerContext#read() 
ChannelHandlerContext#disconnect(ChannelPromise) 
ChannelHandlerContext#close(ChannelPromise) 
ChannelHandlerContext#deregister(ChannelPromise)

如果我们捕获了一个事件, 并且想让这个事件继续传递下去, 那么需要调用 Context 相应的传播方法. 例如:

代码语言:javascript
复制
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("Connected!");
        ctx.fireChannelActive();
    }
}

public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        System.out.println("Closing ..");
        ctx.close(promise);
    }
}

上面的例子中, MyInboundHandler 收到了一个 channelActive 事件, 它在处理后, 如果希望将事件继续传播下去, 那么需要接着调用 ctx.fireChannelActive().

Outbound 操作(outbound operations of a channel) 以connect为例

Bootstrap.connect -> Bootstrap.doResolveAndConnect -> Bootstrap.doResolveAndConnect0 ->Bootstrap.doConnect ->AbstractChannel.connect->pipeline.connect->tail.connect-> AbstractChannelHandlerContext.connect 最后我们以AbstractChannelHandlerContext.connect 的源码分析:

代码语言:javascript
复制
@Override
    public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }
// DefaultChannelPipeline 内的双向链表的 tail 开始, 不断向前寻找第一个 outbound 为 true 的 AbstractChannelHandlerContext, 然后调用它的 invokeConnect
        final AbstractChannelHandlerContext next = findContextOutbound();
        //next其实是找到headContext  unsafe.connect(remoteAddress, localAddress, promise);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
当我们找到了一个 outbound 的 Context 后, 就调用它的 invokeConnect 方法, 
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

invokeConnect这个方法中会调用 Context 所关联着的 ChannelHandler 的 connect 方法。下面的handler()方法会返回一个handlerContex(根据上面next方法我们知道这里返回的为tailContext,但是tailContext并没有实现connect方法,所以这里的connect为其父类AbstractChannelHandlerContext的connect方法。也就是说再次从上面哪个方法开始,知道执行到headContext时,它实现了connect方法如下 方法三)。然后调用connect,包装handler所以即为hanler的connect。

代码语言:javascript
复制
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                //pipeline.channel().unsafe();
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            connect(remoteAddress, localAddress, promise);
        }
    }

方法三 headContext类找的connect方法

@Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); } 所以最终是unsafe.connect,而这unsafe的由来我们前面也分析过,看HeadContext的构造器unsafe = pipeline.channel().unsafe(); 所以它是来自channel。那么channel来自哪呢

代码语言:javascript
复制
  HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, true, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }

还记得 .channel(NioSocketChannel.class)这里就是channel的来源 接下来我们找到unsafe是哪里创建的,查看NioSocketChannel构造器

代码语言:javascript
复制
 public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

构造器跟踪流程:NioSocketChannel->AbstractNioByteChannel->AbstractChannel

代码语言:javascript
复制
 protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

同时newUnsafe() 并没有再AbstractChannel实现,而是在NioSocketChannel实现,这是为什么呢?

代码语言:javascript
复制
 protected AbstractNioUnsafe newUnsafe() {
        return new NioSocketChannelUnsafe();
    }

其实在子类实现,是由于不同的协议用的Unsafe会不同,所以要根据子类区别对待 继续跟踪NioSocketChannelUnsafe但是该类并未实现connect方法,所以查找父类直到找到 AbstractNioUnsafe中的connect方法

代码语言:javascript
复制
 @Override
        public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
           //省略
                //doConnect这里的实现在子类中
                if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive);
                } else {
                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;

                    // Schedule connect timeout.
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }
                   //省略
        }

这段代码的重点就在doConnect,而这个方法在该类中是没有实现的,实现类在子类NioSocketChannel中

代码语言:javascript
复制
 @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

注意:上面这个方法就是和java的NIO联系的地方了 重点分析: 1、首先doBind0方法 使用SocketUtils.bind(javaChannel(), localAddress); 其中的javaChannel()根据java版本选择nio还是bio

代码语言:javascript
复制
private void doBind0(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            SocketUtils.bind(javaChannel(), localAddress);
        } else {
            SocketUtils.bind(javaChannel().socket(), localAddress);
        }
    }

在NIO中javaChannel其实获取SelectableChannel,这里SelectableChannel在之前介绍过。所以bind方法 最后调用socketChannel.bind(address); nio的在前面已介绍这里不再赘述。至此connect方法就追踪到这里。 总结connect事件在outbound中的顺序,结合上面Bootstrap.connect最后到达handler的connect就形成了下面这个循环

Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect 直到head中的connect,这里我们上面分析过了。所以从connect事件来管中窥豹的话。就借用官网的数据流程图吧 参考官网的事件流转图

代码语言:javascript
复制
                                             I/O Request
                                        via Channel or
                                    ChannelHandlerContext
                                                  |
+---------------------------------------------------+---------------+
|                           ChannelPipeline         |               |
|                                                  \|/              |
|    +---------------------+            +-----------+----------+    |
|    | Inbound Handler  N  |            | Outbound Handler  1  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  .               |
|               .                                   .               |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
|        [ method call]                       [method call]         |
|               .                                   .               |
|               .                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  1  |            | Outbound Handler  M  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
+---------------+-----------------------------------+---------------+
              |                                  \|/
+---------------+-----------------------------------+---------------+
|               |                                   |               |
|       [ Socket.read() ]                    [ Socket.write() ]     |
|                                                                   |
|  Netty Internal I/O Threads (Transport Implementation)            |
+-------------------------------------------------------------------+
  • 由于篇幅过长下篇继续讲解Inbound事件的源码
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.02.13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • pipleline和handler以及pipeline的数据流向
相关产品与服务
腾讯云代码分析
腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档