首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Netty】如何接入新连接

【Netty】如何接入新连接

作者头像
用户3467126
修改2019-07-04 10:41:24
1.2K0
修改2019-07-04 10:41:24
举报
文章被收录于专栏:爱编码爱编码

前文再续,书接上一回【NioEventLoop】。 在研究NioEventLoop执行过程的时候,检测IO事件(包括新连接),处理IO事件,执行所有任务三个过程。其中检测IO事件中通过持有的selector去轮询事件,检测出新连接。这里复用同一段代码。

Channel的设计

在开始分析前,先了解一下Channel的设计

顶层Channel接口定义了socket事件如读、写、连接、绑定等事件,并使用AbstractChannel作为骨架实现了这些方法。查看器成员变量,发现大多数通用的组件,都被定义在这里

第二层AbstractNioChannel定义了以NIO,即Selector的方式进行读写事件的监听。其成员变量保存了selector相关的一些属性。

第三层内容比较多,定义了服务端channel(左边继承了AbstractNioMessageChannel的NioServerSocketChannel)以及客户端channel(右边继承了AbstractNioByteChannel的NioSocketChannel)。

如何接入新连接?

本文开始探索一下Netty是如何接入新连接?主要分为四个部分

1.检测新连接 2.创建NioSocketChannel 3.分配线程和注册Selector 4.向Selector注册读事件

1.检测新连接

Netty服务端在启动的时候会绑定一个bossGroup,即NioEventLoop,在bind()绑定端口的时候注册accept(新连接接入)事件。扫描到该事件后,便处理。因此入口从:NioEventLoop#processSelectedKeys()开始。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

        //省略代码

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead

        // to a spin loop

        //如果当前NioEventLoop是workGroup 则可能是OP_READ,bossGroup是OP_ACCEPT

        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {





            //新连接接入以及读事件处理入口

            unsafe.read();

        }

      }

关键的新连接接入以及读事件处理入口unsafe.read();

a).这里的unsafe是在Channel创建过程的时候,调用了父类AbstractChannel#AbstractChannel()的构造方法,和pipeline一起初始化的。

protected AbstractChannel(Channel parent) {

        this.parent = parent;

        id = newId();

        unsafe = newUnsafe();

        pipeline = newChannelPipeline();

    }

服务端: unsafe 为NioServerSockeChannel的父类AbstractNioMessageChannel#newUnsafe()创建,可以看到对应的是AbstractNioMessageChannel的内部类NioMessageUnsafe;

客户端: unsafe为NioSocketChannel的的父类AbstractNioUnsafe#newUnsafe()创建的话,它对应的是AbstractNioByteChannel的内部类NioByteUnsafe

b).unsafe.read()

NioMessageUnsafe.read()中主要的操作如下:

1.循环调用jdk底层的代码创建channel,并用netty的NioSocketChannel包装起来,代表新连接成功接入一个通道。 2.将所有获取到的channel存储到一个容器当中,检测接入的连接数,默认是一次接16个连接 3.遍历容器中的channel,依次调用方法fireChannelRead,4.fireChannelReadComplete,fireExceptionCaught来触发对应的传播事件。

private final class NioMessageUnsafe extends AbstractNioUnsafe {

        //临时存储读到的连接

        private final List<Object> readBuf = new ArrayList<Object>();





        @Override

        public void read() {

            assert eventLoop().inEventLoop();

            final ChannelConfig config = config();

            final ChannelPipeline pipeline = pipeline();





            //服务端接入速率处理器

            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

            allocHandle.reset(config);





            boolean closed = false;

            Throwable exception = null;

            try {

                try {

                    //while循环调用doReadMessages()创建新连接对象

                    do {

                        //获取jdk底层的channel,并加入readBuf容器

                        int localRead = doReadMessages(readBuf);

                        if (localRead == 0) {

                            break;

                        }

                        if (localRead < 0) {

                            closed = true;

                            break;

                        }

                        //把读到的连接做一个累加totalMessages,默认最多累计读取16个连接,结束循环

                        allocHandle.incMessagesRead(localRead);



                    } while (allocHandle.continueReading());

                } catch (Throwable t) {

                    exception = t;

                }



                //触发readBuf容器内所有的传播事件:ChannelRead 读事件

                int size = readBuf.size();

                for (int i = 0; i < size; i ++) {

                    readPending = false;

                    pipeline.fireChannelRead(readBuf.get(i));

                }

                //清空容器

                readBuf.clear();

                allocHandle.readComplete();

                //触发传播事件:ChannelReadComplete,所有的读事件完成

                pipeline.fireChannelReadComplete();





                if (exception != null) {

                    closed = closeOnReadError(exception);

                    //触发传播事件:exceptionCaught,触发异常

                    pipeline.fireExceptionCaught(exception);

                }





                if (closed) {

                    inputShutdown = true;

                    if (isOpen()) {

                        close(voidPromise());

                    }

                }

            } finally {

                // Check if there is a readPending which was not processed yet.

                // This could be for two reasons:

                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method

                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method

                //

                // See https://github.com/netty/netty/issues/2254

                if (!readPending && !config.isAutoRead()) {

                    removeReadOp();

                }

            }

        }

    }

而这一段关键代码逻辑中int localRead = doReadMessages(readBuf);它创建jdk底层channel并且用NioSocketChannel包装起来,将该channel添加到传入的容器保存起来,同时返回一个计数。

protected int doReadMessages(List<Object> buf) throws Exception {

        SocketChannel ch = SocketUtils.accept(javaChannel());





        try {

            if (ch != null) {

  //将jdk底层的channel封装到netty的channel,并存储到传入的容器当中

                //this为服务端channel

                buf.add(new NioSocketChannel(this, ch));

 //成功和创建 客户端接入的一条通道,并返回

                return 1;

            }

        } catch (Throwable t) {

            logger.warn("Failed to create a new channel from an accepted socket.", t);





            try {

                ch.close();

            } catch (Throwable t2) {

                logger.warn("Failed to close a socket.", t2);

            }

        }





        return 0;

    }

2.创建NioSocketChannel

通过检测IO事件轮询新连接,当前成功检测到连接接入事件之后,会调用NioServerSocketChannel#doReadMessages()方法,进行创建NioSocketChannel,即客户端channel的过程。

下面就来了解一下NioSocketChannel的主要工作: .查看原代码做了两件事,调用父类构造方法,实例化一个NioSocketChannelConfig。

public NioSocketChannel(Channel parent, SocketChannel socket) {

        super(parent, socket);

        //实例化一个NioSocketChannelConfig

        config = new NioSocketChannelConfig(this, socket.socket());

    }

1)、查看NioSocketChannel父类构造方法,主要是保存客户端注册的读事件、channel为成员变量,以及设置阻塞模式为非阻塞。

public NioSocketChannel(Channel parent, SocketChannel socket) {

        super(parent, socket);

        //实例化一个NioSocketChannelConfig

        config = new NioSocketChannelConfig(this, socket.socket());

    }

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {

        //传入感兴趣的读事件:客户端channel的读事件

        super(parent, ch, SelectionKey.OP_READ);

    }





    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {

        super(parent);

        //保存客户端channel为成员变量

        this.ch = ch;

        //保存感兴趣的读事件为成员变量

        this.readInterestOp = readInterestOp;

        try {

            //配置阻塞模式为非阻塞

            ch.configureBlocking(false);

        } catch (IOException e) {

            try {

                ch.close();

            } catch (IOException e2) {

                if (logger.isWarnEnabled()) {

                    logger.warn(

                            "Failed to close a partially initialized socket.", e2);

                }

            }





            throw new ChannelException("Failed to enter non-blocking mode.", e);

        }

    }

最后调用父类的构造方法,是设置该客户端channel对应的服务端channel,以及channel的id和两大组件unsafe和pipeline

protected AbstractChannel(Channel parent) {

        //parent为创建次客户端channel的服务端channel(服务端启动过程中通过反射创建的)

        this.parent = parent;

        id = newId();

        unsafe = newUnsafe();

        pipeline = newChannelPipeline();

    }

2)、再看NioSocketChannelConfig实例化。主要是保存了javaSocket,并且通过setTcpNoDelay(true);禁止了tcp的Nagle算法,目的是为了尽量让小的数据包整合成大的发送出去,降低延时.

private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {

            super(channel, javaSocket);

            calculateMaxBytesPerGatheringWrite();

        }





    public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {

        super(channel);

        if (javaSocket == null) {

            throw new NullPointerException("javaSocket");

        }

        //保存socket

        this.javaSocket = javaSocket;





        // Enable TCP_NODELAY by default if possible.

        if (PlatformDependent.canEnableTcpNoDelayByDefault()) {

            try {

                //禁止Nagle算法,目的是为了让小的数据包尽量集合成大的数据包发送出去

                setTcpNoDelay(true);

            } catch (Exception e) {

                // Ignore.

            }

        }

    }

3.分配线程和注册Selector

服务端启动初始化的时候ServerBootstrap#init(),主要做了一些参数的配置。其中对于childGroup,childOptions,childAttrs,childHandler等参数被进行了单独配置。作为参数和ServerBootstrapAcceptor一起,被当作一个特殊的handle,封装到pipeline中。ServerBootstrapAcceptor中的eventLoop为workGroup。

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

  //省略了很多代码.............

    @Override

    void init(Channel channel) throws Exception {





        //配置AbstractBootstrap.option

        final Map<ChannelOption<?>, Object> options = options0();

        synchronized (options) {

            setChannelOptions(channel, options, logger);

        }





        //配置AbstractBootstrap.att

        final Map<AttributeKey<?>, Object> attrs = attrs0();

        synchronized (attrs) {

            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {

                @SuppressWarnings("unchecked")

                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();

                channel.attr(key).set(e.getValue());

            }

        }

        //配置pipeline

        ChannelPipeline p = channel.pipeline();





        //获取ServerBootstrapAcceptor配置参数

        final EventLoopGroup currentChildGroup = childGroup;

        final ChannelHandler currentChildHandler = childHandler;





        final Entry<ChannelOption<?>, Object>[] currentChildOptions;

        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;

        synchronized (childOptions) {

            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));

        }

        synchronized (childAttrs) {

            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));

        }





        p.addLast(new ChannelInitializer<Channel>() {

            @Override

            public void initChannel(final Channel ch) throws Exception {

                final ChannelPipeline pipeline = ch.pipeline();

                //配置AbstractBootstrap.handle

                ChannelHandler handler = config.handler();

                if (handler != null) {

                    pipeline.addLast(handler);

                }





                ch.eventLoop().execute(new Runnable() {

                    @Override

                    public void run() {

                        //配置ServerBootstrapAcceptor,作为Handle紧跟HeadContext

                        pipeline.addLast(new ServerBootstrapAcceptor(

                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

                    }

                });

            }

        });

    }





//省略了很多代码.............

}

可见,整个服务端pipeline的结构如下图所示。bossGroup控制IO事件的检测与处理,整个bossGroup对应的pipeline只包括头(HeadContext)尾(TailContext)以及中部的ServerBootstrap.ServerBootstrapAcceptor。

当新连接接入的时候AbstractNioMessageChannel.NioMessageUnsafe#read()方法被调用,最终调用fireChannelRead(),方法来触发下一个Handler的channelRead方法。而这个Handler正是ServerBootstrapAcceptor

它是ServerBootstrap的内部类,同时继承自ChannelInboundHandlerAdapter。也是一个ChannelInboundHandler。其中channelRead主要做了以下几件事。

1.为客户端channel的pipeline添加childHandler 2.设置客户端TCP相关属性childOptions和自定义属性childAttrs 3.workGroup选择NioEventLoop并注册Selector

1)、为客户端channel的pipeline添加childHandler

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {





        private final EventLoopGroup childGroup;

        private final ChannelHandler childHandler;

        private final Entry<ChannelOption<?>, Object>[] childOptions;

        private final Entry<AttributeKey<?>, Object>[] childAttrs;

        private final Runnable enableAutoReadTask;





        ServerBootstrapAcceptor(

                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,

                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {

            this.childGroup = childGroup;

            this.childHandler = childHandler;

            this.childOptions = childOptions;

            this.childAttrs = childAttrs;





       //省略了一些代码。。。。。

        @Override

        @SuppressWarnings("unchecked")

        public void channelRead(ChannelHandlerContext ctx, Object msg) {

            //该channel为客户端接入时创建的channel

            final Channel child = (Channel) msg;





            //添加childHandle

            child.pipeline().addLast(childHandler);





            //设置TCP相关属性:childOptions

            setChannelOptions(child, childOptions, logger);





            //设置自定义属性:childAttrs

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {

                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());

            }





            try {

                //选择NioEventLoop并注册Selecto

                childGroup.register(child)

                        .addListener(new ChannelFutureListener() {

                    @Override

                    public void operationComplete(ChannelFuture future) throws Exception {

                        if (!future.isSuccess()) {

                            forceClose(child, future.cause());

                        }

                    }

                });

            } catch (Throwable t) {

                forceClose(child, t);

            }

        }

      //省略了一些代码。。。。。

    }

客户端channel的pipeline添加childHandler,在服务端EchoServer创建流程中,childHandler的时候,使用了ChannelInitializer的一个自定义实例。并且覆盖了其initChannel方法,改方法获取到pipeline并添加具体的Handler。查看ChannelInitializer具体的添加逻辑,handlerAdded方法。其实在initChannel逻辑中,首先是回调到用户代码执行initChannel,用户代码执行添加Handler的添加操作,之后将ChannelInitializer自己从pipeline中删除

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {





 @Override

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

        if (ctx.channel().isRegistered()) {

            // This should always be true with our current DefaultChannelPipeline implementation.

            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering

            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers

            // will be added in the expected order.





            //初始化Channel

            if (initChannel(ctx)) {





                // We are done with init the Channel, removing the initializer now.

                removeState(ctx);

            }

        }

    }





    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {

        if (initMap.add(ctx)) { // Guard against re-entrance.

            try {

                //回调到用户代码

                initChannel((C) ctx.channel());

            } catch (Throwable cause) {

                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).

                // We do so to prevent multiple calls to initChannel(...).

                exceptionCaught(ctx, cause);

            } finally {

                ChannelPipeline pipeline = ctx.pipeline();

                if (pipeline.context(this) != null) {

                    //删除本身

                    pipeline.remove(this);

                }

            }

            return true;

        }

        return false;

    }





}

2)、设置客户端TCP相关属性childOptions和自定义属性childAttrs 这点在ServerBootstrapAcceptor#init()方法中已经体现

3)、workGroup选择NioEventLoop并注册Selector 这要从AbstractBootstrap#initAndRegister()方法开始,然后跟踪源码会来到AbstractUnsafe#register()方法

protected abstract class AbstractUnsafe implements Unsafe {

      //省略了一些代码。。。。。

  @Override

        public final void register(EventLoop eventLoop, final ChannelPromise promise) {

            if (eventLoop == null) {

                throw new NullPointerException("eventLoop");

            }

            if (isRegistered()) {

                promise.setFailure(new IllegalStateException("registered to an event loop already"));

                return;

            }

            if (!isCompatible(eventLoop)) {

                promise.setFailure(

                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));

                return;

            }





            AbstractChannel.this.eventLoop = eventLoop;





            if (eventLoop.inEventLoop()) {

                register0(promise);

            } else {

                try {

                    eventLoop.execute(new Runnable() {

                        @Override

                        public void run() {

                            register0(promise);

                        }

                    });

                } catch (Throwable t) {

                    logger.warn(

                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",

                            AbstractChannel.this, t);

                    closeForcibly();

                    closeFuture.setClosed();

                    safeSetFailure(promise, t);

                }

            }

        }

      //省略了一些代码。。。。。

}

最后调用AbstractNioUnsafe#doRegister()方法通过jdk的javaChannel().register完成注册功能。

protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {

      //省略了一些代码。。。。。

  @Override

    protected void doRegister() throws Exception {

        boolean selected = false;

        for (;;) {

            try {

                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

                return;

            } catch (CancelledKeyException e) {

                if (!selected) {

                    // Force the Selector to select now as the "canceled" SelectionKey may still be

                    // cached and not removed because no Select.select(..) operation was called yet.

                    eventLoop().selectNow();

                    selected = true;

                } else {

                    // We forced a select operation on the selector before but the SelectionKey is still cached

                    // for whatever reason. JDK bug ?

                    throw e;

                }

            }

        }

    }

      //省略了一些代码。。。。。

}

4.向Selector注册读事件

a)、入口:ServerBootstrap.ServerBootstrapAcceptor#channelRead()#childGroup.register();

public void channelRead(ChannelHandlerContext ctx, Object msg) {

            final Channel child = (Channel) msg;





            child.pipeline().addLast(childHandler);





            setChannelOptions(child, childOptions, logger);





            for (Entry<AttributeKey<?>, Object> e: childAttrs) {

                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());

            }





            try {

                childGroup.register(child).addListener(new ChannelFutureListener() {

                    @Override

                    public void operationComplete(ChannelFuture future) throws Exception {

                        if (!future.isSuccess()) {

                            forceClose(child, future.cause());

                        }

                    }

                });

            } catch (Throwable t) {

                forceClose(child, t);

            }

        }

b)、实际上调用了AbstractChannel.AbstractUnsafe#register0(),触发了通道激活事件;

//触发通道激活事件,调用HeadContent的

   pipeline.fireChannelActive();

c)、pipeline的头部开始,即DefaultChannelPipeline.HeadContext#channelActive()从而触发了readIfIsAutoRead();

@Override

  public void channelActive(ChannelHandlerContext ctx) {

            ctx.fireChannelActive();





            readIfIsAutoRead();

  }

d)、读事件将从尾部的TailContent#read()被触发,从而依次执行ctx.read(),从尾部开始,每个outboundHandler的read()事件都被触发。直到头部。

@Override

    public final ChannelPipeline read() {

        tail.read();

        return this;

    }









    @Override

    public ChannelHandlerContext read() {

        //获取最近的outboundhandle

        final AbstractChannelHandlerContext next = findContextOutbound();

        EventExecutor executor = next.executor();





        //并依次执行其read方法

        if (executor.inEventLoop()) {

            next.invokeRead();

        } else {

            Tasks tasks = next.invokeTasks;

            if (tasks == null) {

                next.invokeTasks = tasks = new Tasks(next);

            }

            executor.execute(tasks.invokeReadTask);

        }





        return this;

    }

e)、进入头部HeadContext#read(),并且最终更改了selectionKey,向selector注册了读事件

HeadContext#read()-->AbstractChannel#beginRead()-->AbstractNioMessageChannel#doBeginRead-->AbstractNioChannel#doBeginRead()

@Override

    protected void doBeginRead() throws Exception {

        // Channel.read() or ChannelHandlerContext.read() was called

        final SelectionKey selectionKey = this.selectionKey;

        if (!selectionKey.isValid()) {

            return;

        }





        readPending = true;





        final int interestOps = selectionKey.interestOps();

        if ((interestOps & readInterestOp) == 0) {

            selectionKey.interestOps(interestOps | readInterestOp);

        }

    }

参考文章:

https://www.jianshu.com/u/fc9c660e9843

总结

Netty如何接入新连接基本流程如上所述,如果有误,还望各位指正。建议先从前两篇看起比较好理解点。

【Netty】服务端和客户端

学习NioEventLoop

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 爱编码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Channel的设计
  • 如何接入新连接?
  • 1.检测新连接
  • 2.创建NioSocketChannel
  • 3.分配线程和注册Selector
  • 4.向Selector注册读事件
  • 总结
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档