首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >第十七节 netty源码分析之pipeline的来源

第十七节 netty源码分析之pipeline的来源

作者头像
用户1418372
发布2019-03-05 10:40:48
3050
发布2019-03-05 10:40:48
举报
文章被收录于专栏:清晨我上码清晨我上码
pipeline的来源
  • 继续跟踪源码EchoClient中得Bootstrap类。 根据前面得分析总结如下过程:
Bootstrap中channel(NioSocketChannel.class)
-》NioSocketChannel的初始化时创建-》NioSocketChannel this(DEFAULT_SELECTOR_PROVIDER)
-》NioSocketChannel中this(newSocket(provider)) newSocket方法中的参数provider会根据操作系统选择不同的SelectorProvider(SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();)。 创建socketchannel方法provider.openSocketChannel(),SelectorProvider的用法可参考nio中的
-》继续调用构造器this(null, socket);其中的socket为newSocket方法创建的
-》接下来调用父类的构造器和初始化cofnig

查看NioSocketChannel的构造代码:

 public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

-》 转到父类的构造器中

   /**
     * Create a new instance
     *
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     */
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }
--》 继续调用父类构造器会发现又一次掉用父类的构造器 super(parent);
--》 下面找到我们费尽心机的pipeline的创建如下:
    /**
    * Creates a new instance.
    *
    * @param parent
    *        the parent of this channel. {@code null} if there's no parent.
    */
   protected AbstractChannel(Channel parent) {
       this.parent = parent;
       id = newId();
       unsafe = newUnsafe();
       pipeline = newChannelPipeline();
   }
--》 上面的newChannelPipeline方法其实调用DefaultChannelPipeline的构造器我们跟踪一下piple的创建
 protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }
* 从上面的构造方法中可以推算Pipeline包含为一个双向链表,且会有一个头和尾,类图如下

总结:每创建要给channel中会创建一个pipeline. pipeline为一个双向链表,会有一个头和尾
 头尾,从两者继承关系来看基本一致。区别在于实现接口不同,head实现ChannelOutboundHandler, ChannelInboundHandler而tail实现ChannelInboundHandler

图片.png

图片.png

再看下他们的父类 AbstractChannelHandlerContext 的构造器, 分别以参数 inbound , outbound .来区分head和tail 结合他们实现的接口,header 是一个 outboundHandler, 而 tail 是一个inboundHandler。

2 piple是如何将handler起到作用的呢? 1、 ChannelInitializer 的添加 初始化 Bootstrap, 会添加我们自定义的 ChannelHandler, 就以我们熟悉的 EchoClient 来举例

Bootstrap b = new Bootstrap();
            b.group(group)
            //初始化工厂ReflectiveChannelFactory为后续链接connect方法创建NioSocketChannel对象
             .channel(NioSocketChannel.class)
                    //将选项添加到AbstractBootstrap属性options. 实现类中Bootstrap的init(Channel channel)方法设置channel的类型
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });
            // Start the client.
            //包含channel(注意channel可抽象为socket链接来理解)实例化Bootstrap.connect -> Bootstrap.doConnect -> AbstractBootstrap.initAndRegister(最终channelFactory.newChannel();这里的factory就是前面设置的ReflectiveChannelFactory)
           //  NioSocketChannel最终newSocket 来打开一个新的 Java NIO SocketChannel, 最后调用父类AbstractChannel(Channel parent)
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();

在调用 handler 时, 传入了 ChannelInitializer 对象, 它提供了一个 initChannel 方法供我们初始化 ChannelHandler. 那么这个初始化过程是怎样的呢 源码追踪:首先 Bootstrap中调用handler方法 .handler(new ChannelInitializer<SocketChannel>(),将ChannelInitializer父类AbstractBootstrap中方法

  /**
     * the {@link ChannelHandler} to use for serving the requests.
     */
    public B handler(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
        return self();
    }

初始化后的使用过程: 其实再ChannelFuture f = b.connect(HOST, PORT).sync();中的connect方法中,一路看下去会在父类AbstractBootstrap找到initAndRegister()初始化这楼里 是一个很重要的方法。暂时我们只关注 init(channel);这里是我们要关注的地方因为这里是channel和pipeline交互的关键,而且他的具体实现在Bootstrap中

  @Override
  @SuppressWarnings("unchecked")
  void init(Channel channel) throws Exception {
      ChannelPipeline p = channel.pipeline();
      // config使用bootstrap.handler(),就是最初ChannelInitializer,可参考上面分析
      p.addLast(config.handler());

      final Map<ChannelOption<?>, Object> options = options0();
      synchronized (options) {
          //设置channel类型
          setChannelOptions(channel, options, logger);
      }

最终我们要关注 p.addLast(config.handler());这里将我们出事的handler添加到了pipleline的末端(pipleline的结构借本介绍在前面已介绍) 为了添加一个 handler 到 pipeline 中, 必须把此 handler 包装成 ChannelHandlerContext. 因此在上面的代码中我们可以看到新实例化了一个 newCtx 对象, 并将 handler 作为参数传递到构造方法中 最后追踪到DefaultChannelPipeline中方法

@Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //检查是否重名
            checkMultiplicity(handler);
//创建DefaultChannelHandlerContext对象
            newCtx = newContext(group, filterName(name, handler), handler);
//            将生成的newCtx插入handlercontex链表中,addLast0方法会将newCtx插入tail之前
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
  • 我们来关注下newContext方法是如何构造
 DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
            //isInbound  isOutbound 方法返回true false 
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

ChannelInitializer的继承关系如下图:

图片.png

下一节再具体分析pipeline和handler的具体实现细节:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.01.31 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • pipeline的来源
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档