专栏首页开发架构二三事netty源码分析四之客户端接入与数据写出

netty源码分析四之客户端接入与数据写出

上文中分析了ServerBootstrap的初始化流程,其实也就是NioServerSockerChannel的轮询线程reactor线程的初始化过程。本节咱们聊一聊客户端也就是NioSocketChannel的接入方式。

在上文Bootstrap初始化流程分析中我们知道,在NioServerSocketChannel进行register时,会调用eventLoop.execute方法:

//这里是传入的eventLoopAbstractChannel.this.eventLoop = eventLoop;//如果是内部线程if (eventLoop.inEventLoop()) {    register0(promise);} else {    try {        //如果不是内部线程        eventLoop.execute(new Runnable() {            @Override            public void run() {                register0(promise);            }        });

由于调用方法的线程是进行初始化操作的main线程,所以会进入eventLoop.execute方法,调用的是io.netty.util.concurrent.SingleThreadEventExecutor#execute:

@Override    public void execute(Runnable task) {        if (task == null) {            throw new NullPointerException("task");        }
        boolean inEventLoop = inEventLoop();        if (inEventLoop) {            //内部线程在往任务队列中添加任务时执行addTask            addTask(task);        } else {            //外部线程在往任务队列里面添加任务的时候执行 startThread() ,            // netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务            startThread();            addTask(task);            if (isShutdown() && removeTask(task)) {                reject();            }        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {            wakeup(inEventLoop);        }    }

在startThread方法中,会调用doStartThread方法,然后调用eventLoop线程的run方法:

   private void startThread() {        //检查下线程是否已经启动        if (state == ST_NOT_STARTED) {            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {                doStartThread();            }        }    }
     /**        * SingleThreadEventExecutor 在执行doStartThread的时候,会调用内部执行器executor的execute方法,        * 将调用NioEventLoop的run方法的过程封装成一个runnable塞到一个线程中去执行        */   private void doStartThread() {       assert thread == null;       System.out.println("do start Thread:" + executor);       //executor 默认是ThreadPerTaskExecutor       //该线程就是executor创建,对应netty的reactor线程实体       //默认情况下,ThreadPerTaskExecutor 在每次执行execute       // 方法的时候都会通过DefaultThreadFactory创建一个FastThreadLocalThread线程,而这个线程就是netty中的reactor线程实体       executor.execute(new Runnable() {           @Override           public void run() {               thread = Thread.currentThread();               if (interrupted) {                   thread.interrupt();               }
               boolean success = false;               updateLastExecutionTime();               try {                   //调用的是io.netty.channel.nio.NioEventLoop.run                   SingleThreadEventExecutor.this.run();                   success = true;               } catch (Throwable t) {                   logger.warn("Unexpected exception from an event executor: ", t);         ............           

io.netty.channel.nio.NioEventLoop#run:

 @Override    protected void run() {        for (;;) {            try {                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.SELECT:                        //首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件                        //wakenUp 表示是否应该唤醒正在阻塞的select操作,可以看到netty在进行一次新的loop之前,                        // 都会将wakeUp 被设置成false,标志新的一轮loop的开始,具体的select操作我们也拆分开来看                        select(wakenUp.getAndSet(false));                        if (wakenUp.get()) {                           //轮询结束,对wakenUp进行重置,方便下一次轮询                            selector.wakeup();                        }                    default:                        // fallthrough                }
                cancelledKeys = 0;                needsToSelectAgain = false;                final int ioRatio = this.ioRatio;                //关于ioRatio见: https://github.com/netty/netty/issues/6058                //ioRatio只是用来控制io相关的任务,和线程的设置没啥关系,这里的io相关的任务并不是io密集的操作,大多数都是cpu用来拷贝字节块的操作,和io无关                if (ioRatio == 100) {                    try {                        //处理产生网络IO事件的channel                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        //处理任务队列                        runAllTasks();                    }                } else {                    final long ioStartTime = System.nanoTime();                    try {                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        final long ioTime = System.nanoTime() - ioStartTime;                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                    }                }      .........

ioRatio只是用来控制io相关的任务,和线程的设置没啥关系,这里的io相关的任务并不是io密集的操作,大多数都是cpu用来拷贝字节块的操作,和io无关

每次轮询之后都会进入到processSelectedKeys方法:

private void processSelectedKeys() {        if (selectedKeys != null) {            processSelectedKeysOptimized();        } else {            processSelectedKeysPlain(selector.selectedKeys());        }    }

processSelectedKeysOptimized方法:

 private void processSelectedKeysOptimized() {        for (int i = 0; i < selectedKeys.size; ++i) {            final SelectionKey k = selectedKeys.keys[i];            // null out entry in the array to allow to have it GC'ed once the Channel close            // See https://github.com/netty/netty/issues/2363            selectedKeys.keys[i] = null;
            final Object a = k.attachment();            //在ServerBootStrap初始化创建NioServerSocketChannel时会将NioServerSocketChannel的引用放入attachment中,这里是取出对应的引用            if (a instanceof AbstractNioChannel) {                processSelectedKey(k, (AbstractNioChannel) a);            } else {                @SuppressWarnings("unchecked")                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                processSelectedKey(k, task);            }            //是否需要再一次select            if (needsToSelectAgain) {                // null out entries in the array to allow to have it GC'ed once the Channel close                // See https://github.com/netty/netty/issues/2363                selectedKeys.reset(i + 1);
                selectAgain();                i = -1;            }        }    }

在ServerBootStrap初始化创建NioServerSocketChannel时会将NioServerSocketChannel的引用放入attachment中,这里是取出对应的引用。接着看io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)方法的关键部分:

 int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking    // See https://github.com/netty/netty/issues/924    int ops = k.interestOps();    ops &= ~SelectionKey.OP_CONNECT;    k.interestOps(ops);
    unsafe.finishConnect();}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write    ch.unsafe().forceFlush();}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {    //进入的是    unsafe.read();}

在reactor线程初始化完成后readyOps的值是OP_ACCEPT的值。当有新连接接入时,会进入unsafe.read()方法,毫无疑问,这个unsafe指的是NioMessageUnsafe,关于这点在之前的几篇文章中都有提到。

我们来看read方法,io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read:

 try {        do {            int localRead = doReadMessages(readBuf);            if (localRead == 0) {                break;            }            if (localRead < 0) {                closed = true;                break;            }
            allocHandle.incMessagesRead(localRead);        } while (allocHandle.continueReading());    } catch (Throwable t) {        exception = t;    }
    int size = readBuf.size();    for (int i = 0; i < size; i ++) {        readPending = false;        pipeline.fireChannelRead(readBuf.get(i));    }    readBuf.clear();

我们一步步来看,首先doReadMessages(readBuf):

 @Override    protected int doReadMessages(List<Object> buf) throws Exception {        SocketChannel ch = SocketUtils.accept(javaChannel());
        try {            if (ch != null) {                buf.add(new NioSocketChannel(this, ch));                return 1;            }        } catch (Throwable t) {            logger.warn("Failed to create a new channel from an accepted socket.", t);
            try {                ch.close();            } catch (Throwable t2) {                logger.warn("Failed to close a socket.", t2);            }        }
        return 0;    }

new NioSocketChannel并将channel放入buf中,其中buf的结构是一个List,需要注意的一条是,在初始化NioSocketChannel时建立的unsafe为NioSocketChannelUnsafe:

io.netty.channel.socket.nio.NioSocketChannel#newUnsafe
 @Override    protected AbstractNioUnsafe newUnsafe() {        return new NioSocketChannelUnsafe();    }

对列表执行pipeline.fireChannelRead(readBuf.get(i)),并在成功后clear了列表。

我们看下pipeline.fireChannelRead(readBuf.get(i)),最终会调用到io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)方法:

private void invokeChannelRead(Object msg) {    if (invokeHandler()) {        try {            ((ChannelInboundHandler) handler()).channelRead(this, msg);        } catch (Throwable t) {            notifyHandlerException(t);        }    } else {        fireChannelRead(msg);    }}

也就是开始执行pipeline了,回想一下我们在初始化的时候是不是往ServerSocketChannel的pipeline中添加了一个ServerBootstrapAcceptor,我们看下 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead:

 @Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {    final Channel child = (Channel) msg;    System.out.println(child);    //这里把ServerBootstrap传入的childHandler添加到pipeline    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());    }
    try {        //childGroup对应的是外面传入的workerGroup        //这里的child是指每个连接对应的SocketChannel        childGroup.register(child).addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                    forceClose(child, future.cause());                }            }        });    } catch (Throwable t) {        forceClose(child, t);    }}

由上下文可以知道传入的Object msg的实际值是NioSocketChannel。在这里会将初始化时传入的用户自定义的handler放入SocketChannel的pipeline中。

我们接下来看下childGroup.register(child),它会进入到io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel):

  @Override    public ChannelFuture register(Channel channel) {        return next().register(channel);    }

从EventLoopGroup初始化那一篇的分析我们可以知道,next()方法是通过chooser从初始化的workerGroup的EventLoop列表中随机选一个。进入io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel):

@Override    public ChannelFuture register(Channel channel) {        //new DefaultChannelPromise(channel, this) 这里是将this作为DefaultChannelPromise的executor        return register(new DefaultChannelPromise(channel, this));    }
  @Override    public ChannelFuture register(final ChannelPromise promise) {        ObjectUtil.checkNotNull(promise, "promise");        //注意,这里往下面的register(EventLoop eventLoop, final ChannelPromise promise)方法中传入的eventLoop是this,也就是        promise.channel().unsafe().register(this, promise);        return promise;    }    

这里需要注意的是register方法传入的eventLoop是在workerGroup的EventLoop数组中随机选取的一个。

同时由上文我们知道这个unsafe为NioSocketChannelUnsafe,它的register方法调用的是父类中的io.netty.channel.AbstractChannel.AbstractUnsafe#register方法,与NioServerSocketChannel相同。

 @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {    if (eventLoop == null) {        throw new NullPointerException("eventLoop");    }    //查看一下当前channel是不是已经注册过了    if (isRegistered()) {        promise.setFailure(new IllegalStateException("registered to an event loop already"));        return;    }    if (!isCompatible(eventLoop)) {        promise.setFailure(                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));        return;    }    //这里是传入的eventLoop    AbstractChannel.this.eventLoop = eventLoop;    //如果是内部线程    if (eventLoop.inEventLoop()) {        register0(promise);    } else {        try {            //如果不是内部线程            eventLoop.execute(new Runnable() {                @Override                public void run() {                    register0(promise);                }            });        } catch (Throwable t) {            logger.warn(                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",                    AbstractChannel.this, t);            closeForcibly();            closeFuture.setClosed();            safeSetFailure(promise, t);        }    }}

这里又会进入eventLoop.inEventLoop()判断,这里客户端SocketChannel的注册是在reactor线程即bossEventLoopGroup中进行的,而register方法传入的是 workerGroup初始化时的EventLoop列表中随机选择的一个EventLoop,它和reactor线程的eventLoop明显不同。所以它会进入eventLoop.execute(new Runnable())方法。

之前也提到过eventLoop.execute(new Runnable()方法,这里再简单分析一下,它调用的是io.netty.util.concurrent.SingleThreadEventExecutor#execute:

  @Override    public void execute(Runnable task) {        if (task == null) {            throw new NullPointerException("task");        }
        boolean inEventLoop = inEventLoop();        if (inEventLoop) {            //内部线程在往任务队列中添加任务时执行addTask            addTask(task);        } else {            //外部线程在往任务队列里面添加任务的时候执行 startThread() ,            // netty会判断reactor线程有没有被启动,如果没有被启动,那就启动线程再往任务队列里面添加任务            //worker线程也是一样            startThread();            addTask(task);            if (isShutdown() && removeTask(task)) {                reject();            }        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {            wakeup(inEventLoop);        }    }
  • execute方法也是在reactor线程中执行的,所以这里会进入到下面的 startThread()方法,在 startThread()方法中会进入当前worker EventLoop的线程启动工作。
  • 当前worker EventLoop线程启动之后,将执行registor0的Runnable任务放入到EventLoop的队列中。由于启动EventLoop线程后,EventLoop的run方法是一个死循环,它会在发现队列中有任务时将任务取出并执行。

我们进入看一下io.netty.util.concurrent.SingleThreadEventExecutor#startThread:

private void startThread() {        //检查下线程是否已经启动        if (state == ST_NOT_STARTED) {            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {                doStartThread();            }        }    }

会先检查当前EventLoop线程是否已经启动,这是因为workGroup初始化时初始化的是一个线程组,然后里面会初始化一些线程,如果没有指定线程个数,会初始化cpu核数*2个线程。这些线程是要重复使用的,是池化的线程。所以如果是已经启动,就不需要再doStartThread()去启动线程了。io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread:

private void doStartThread() {    assert thread == null;    System.out.println("do start Thread:" + executor);    //executor 默认是ThreadPerTaskExecutor    //对应的workerGroup也有这样一个executor    executor.execute(new Runnable() {        @Override        public void run() {            thread = Thread.currentThread();            if (interrupted) {                thread.interrupt();            }
            boolean success = false;            updateLastExecutionTime();            try {                //调用的是io.netty.channel.nio.NioEventLoop.run                System.out.println("thread is:" + Thread.currentThread() + "    SingleThreadEventExecutor.this is: " + SingleThreadEventExecutor.this);                SingleThreadEventExecutor.this.run();                success = true;                ...

SingleThreadEventExecutor.this.run()就是调用NioEventLoop的run方法。这个run方法是一个死循环。关于run方法我们在下文会进行分析。

这时我们再回过头来看register0(promise),实际调用的是io.netty.channel.AbstractChannel.AbstractUnsafe#register0方法:

private void register0(ChannelPromise promise) {    try {        // check if the channel is still open as it could be closed in the mean time when the register        // call was outside of the eventLoop        if (!promise.setUncancellable() || !ensureOpen(promise)) {            return;        }        //默认值是true        boolean firstRegistration = neverRegistered;        //进行注册        doRegister();        //注册结束后设为false        neverRegistered = false;        //将状态改成已注册        registered = true;
        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the        // user may already fire events through the pipeline in the ChannelFutureListener.        // 添加用户添加的PendingHandlerAddedTask        pipeline.invokeHandlerAddedIfNeeded();        //确保promise设置成功        safeSetSuccess(promise);        //回调头节点的channelRegistered方法        pipeline.fireChannelRegistered();        // Only fire a channelActive if the channel has never been registered. This prevents firing        // multiple channel actives if the channel is deregistered and re-registered.        //如果 Channel 处于 open 状态,则调用 doRegister() 方法完成注册,然后将注册结果设置为成功。        // 最后判断如果是首次注册且处于激活状态,则发起 pipeline 的 fireChannelActive()        if (isActive()) {            ////如果是首次注册,发起 pipeline 的 fireChannelActive            if (firstRegistration) {                pipeline.fireChannelActive();            } else if (config().isAutoRead()) {                // This channel was registered before and autoRead() is set. This means we need to begin read                // again so that we process inbound data.                //                // See https://github.com/netty/netty/issues/4805                //这种情况会出现在先deregister然后再register时                // ChannelFuture cf = ctx.deregister();                //        cf.addListener(new ChannelFutureListener() {                //            @Override                //            public void operationComplete(ChannelFuture channelFuture) throws Exception {                //                eventLoop.register(channelFuture.channel()).addListener(completeHandler);                //            }                //        });                beginRead();            }        }    } catch (Throwable t) {        // Close the channel directly to avoid FD leak.        closeForcibly();        closeFuture.setClosed();        safeSetFailure(promise, t);    }}

其他的在这里不多说明了,我们看下doRegister()方法,io.netty.channel.nio.AbstractNioChannel#doRegister:

@Override    protected void doRegister() throws Exception {        boolean selected = false;        for (;;) {            try {                //这里注册时 ops 设置的是 0,也就是说 SocketChannel 仅仅只是表示了注册成功,还不能监听任何网络操作                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);                System.out.println("javaChennel:" + javaChannel() + "   selectionKey:" + selectionKey.readyOps());                return;            } catch (CancelledKeyException e) {                if (!selected) {                    // Force the Selector to select now as the "canceled" SelectionKey may still be                    // cached and not removed because no Select.select(..) operation was called yet.                    eventLoop().selectNow();                    selected = true;                } else {                    // We forced a select operation on the selector before but the SelectionKey is still cached                    // for whatever reason. JDK bug ?                    throw e;                }            }        }    }

向eventLoop().unwrappedSelector()上注册SocketChannel,并设置ops为0(目的是方便通过SelectionKey.interestOps进行设置),并将当前SocketChannel作为attachment传入。其中eventLoop().unwrappedSelector()得到的是当前EventLoop独有的 Selector对象,关于这点可以看下NioEventLoop的构造方法:

 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {        //executor为ThreadPerTaskExecutor        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);        if (selectorProvider == null) {            throw new NullPointerException("selectorProvider");        }        if (strategy == null) {            throw new NullPointerException("selectStrategy");        }        provider = selectorProvider;        final SelectorTuple selectorTuple = openSelector();        selector = selectorTuple.selector;        System.out.println("NioEventLoop is:" + this + "   selector is:" + selector);        unwrappedSelector = selectorTuple.unwrappedSelector;        selectStrategy = strategy;    }

每个NioEventLoop都会打开一个selector,在初始化workerGroup时,workerGroup中维护着多个NioEventLoop,然后每个SocketChannel都会与其中一个NioEventLoop的selector完成注册关系。其中NioEventLoop的启动 是通过那个EventLoopGroup的executor对象来完成的,NioEventGroup的executor是ThreadPerTaskExecutor,bossGroup对应一个ThreadPerTaskExecutor对象,workerGroup对应另一个ThreadPerTaskExecutor对象。ThreadPerTaskExecutor对象启动EventLoop线程的位置在上面已经讲过的io.netty.util.concurrent.SingleThreadEventExecutor#startThread。

SingleThreadEventExecutor的startThread方法会先判断这个NioEventLoop线程有没有启动,如果已经启动就直接返回,如果没有启动,会进行doStartThread操作:

我们进入SingleThreadEventExecutor.this.run()方法,也就是io.netty.channel.nio.NioEventLoop#run:

 @Override    protected void run() {        for (;;) {            try {                System.out.println("nioEventLoop :" + Thread.currentThread() + " this is:" + this);                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.SELECT:                        //首先轮询注册到reactor线程对用的selector上的所有的channel的IO事件                        //wakenUp 表示是否应该唤醒正在阻塞的select操作,可以看到netty在进行一次新的loop之前,                        // 都会将wakeUp 被设置成false,标志新的一轮loop的开始,具体的select操作我们也拆分开来看                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();                        }                    default:                        // fallthrough                }
                cancelledKeys = 0;                needsToSelectAgain = false;                final int ioRatio = this.ioRatio;                //关于ioRatio见: https://github.com/netty/netty/issues/6058                //ioRatio只是用来控制io相关的任务,和线程的设置没啥关系,这里的io相关的任务并不是io密集的操作,大多数都是cpu用来拷贝字节块的操作,和io无关                if (ioRatio == 100) {                    try {                        //处理产生网络IO事件的channel                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        //处理任务队列                        runAllTasks();                    }                } else {                    final long ioStartTime = System.nanoTime();                    try {                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        final long ioTime = System.nanoTime() - ioStartTime;                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                    }                }            } catch (Throwable t) {                handleLoopException(t);            }            // Always handle shutdown even if the loop processing threw an exception.            try {                if (isShuttingDown()) {                    closeAll();                    if (confirmShutdown()) {                        return;                    }                }            } catch (Throwable t) {                handleLoopException(t);            }        }    }

这里我们主要分析一下select(wakenUp.getAndSet(false))方法和processSelectedKeys()方法,关于runAllTasks()和runAllTasks(ioTime * (100 - ioRatio) / ioRatio)方法将在之后的netty的线程和任务中专门分析。

select(wakenUp.getAndSet(false))

 private void select(boolean oldWakenUp) throws IOException {        Selector selector = this.selector;        try {            int selectCnt = 0;            long currentTimeNanos = System.nanoTime();            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);            for (;;) {                //定时任务截止事时间快到了,中断本次轮询                //我们可以看到,NioEventLoop中reactor线程的select操作也是一个for循环,在for循环第一步中,                // 如果发现当前的定时任务队列中有任务的截止事件快到了(<=0.5ms),就跳出循环。                // 此外,跳出之前如果发现目前为止还没有进行过select操作(if (selectCnt == 0)),那么就调用一次selectNow(),该方法会立即返回,不会阻塞                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;                if (timeoutMillis <= 0) {                    if (selectCnt == 0) {                        selector.selectNow();                        selectCnt = 1;                    }                    break;                }
                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call                // Selector#wakeup. So we need to check task queue again before executing select operation.                // If we don't, the task might be pended until select operation was timed out.                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.                //轮询过程中发现有任务加入,中断本次轮询                //如果不这么做,那么新加入的任务就会被挂起直到select操作超时,如果pipeline添加了IdleStateHandler,那么它可能挂起直到idle超时                //netty为了保证任务队列能够及时执行,在进行阻塞select操作的时候会判断任务队列是否为空,如果不为空,就执行一次非阻塞select操作,跳出循环                if (hasTasks() && wakenUp.compareAndSet(false, true)) {                    selector.selectNow();                    selectCnt = 1;                    break;                }                //阻塞式select操作                //执行到这一步,说明netty任务队列里面队列为空,并且所有定时任务延迟时间还未到(大于0.5ms),                // 于是,在这里进行一次阻塞select操作,截止到第一个定时任务的截止时间
                //如果第一个定时任务的延迟非常长,比如一个小时,那么有没有可能线程一直阻塞在select操作,当然有可能!                // But,只要在这段时间内,有新任务加入,该阻塞就会被释放
                int selectedKeys = selector.select(timeoutMillis);                selectCnt ++;
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {                    // - Selected something,                    // - waken up by user, or                    // - the task queue has a pending task.                    // - a scheduled task is ready for processing                    break;                }                if (Thread.interrupted()) {                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.                    // As this is most likely a bug in the handler of the user or it's client library we will                    // also log it.                    //                    // See https://github.com/netty/netty/issues/2426                    if (logger.isDebugEnabled()) {                        logger.debug("Selector.select() returned prematurely because " +                                "Thread.currentThread().interrupt() was called. Use " +                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");                    }                    selectCnt = 1;                    break;                }                //解决jdk的nio bug                //                //关于该bug的描述见 http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6595055)                //The problem is around a channel that was originally registered with Selector for i/o gets closed on the server side (due to early client side exit).  But the server side can know about such channel only when it does i/o (read/write) and thereby getting into an IO exception. In this case, (bug 6595055)there are times (erroneous) when server side (selector) did not know the channel is already closed (peer-reset), but continue to do the selection cycle on a key set whose associated channel is alreay closed or invalid.                // Hence, selector's slect(..) keep spinging with zero return without blocking for the timeout period
                //该bug会导致Selector一直空轮询,最终导致cpu 100%,nio server不可用,严格意义上来说,netty没有解决jdk的bug,                // 而是通过一种方式来巧妙地避开了这个bug,具体做法如下                //netty 会在每次进行 selector.select(timeoutMillis) 之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间,                // 判断select操作是否至少持续了timeoutMillis秒(这里将time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos                // 改成time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)或许更好理解一些),                //如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,                // 当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector                long time = System.nanoTime();                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {                    // timeoutMillis elapsed without anything selected.                    selectCnt = 1;                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {                    // The selector returned prematurely many times in a row.                    // Rebuild the selector to work around the problem.                    logger.warn(                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",                            selectCnt, selector);
                    rebuildSelector();                    selector = this.selector;
                    // Select again to populate selectedKeys.                    selector.selectNow();                    selectCnt = 1;                    break;                }
                currentTimeNanos = time;            }
            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {                if (logger.isDebugEnabled()) {                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                            selectCnt - 1, selector);                }            }        } catch (CancelledKeyException e) {            if (logger.isDebugEnabled()) {                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",                        selector, e);            }            // Harmless exception - log anyway        }    }

processSelectedKeys()

private void processSelectedKeys() {        if (selectedKeys != null) {            processSelectedKeysOptimized();        } else {            processSelectedKeysPlain(selector.selectedKeys());        }    }
private void processSelectedKeysOptimized() {        for (int i = 0; i < selectedKeys.size; ++i) {            final SelectionKey k = selectedKeys.keys[i];            // null out entry in the array to allow to have it GC'ed once the Channel close            // See https://github.com/netty/netty/issues/2363            selectedKeys.keys[i] = null;
            final Object a = k.attachment();            //在ServerBootStrap初始化创建NioServerSocketChannel,对NioServerSocketChannel进行注册时会将NioServerSocketChannel的引用放入attachment中,这里是取出对应的引用            //在创建SocketChannel,对SocketChannel进行注册时也将SocketChannel的引用放入attachment中去。            if (a instanceof AbstractNioChannel) {                processSelectedKey(k, (AbstractNioChannel) a);            } else {                @SuppressWarnings("unchecked")                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                processSelectedKey(k, task);            }            //是否需要再一次select            if (needsToSelectAgain) {                // null out entries in the array to allow to have it GC'ed once the Channel close                // See https://github.com/netty/netty/issues/2363                selectedKeys.reset(i + 1);
                selectAgain();                i = -1;            }        }    }    

在创建SocketChannel,对SocketChannel进行注册时会将SocketChannel的引用放入attachment中去:

 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();        if (!k.isValid()) {            final EventLoop eventLoop;            try {                eventLoop = ch.eventLoop();            } catch (Throwable ignored) {                // If the channel implementation throws an exception because there is no event loop, we ignore this                // because we are only trying to determine if ch is registered to this event loop and thus has authority                // to close ch.                return;            }            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is            // still healthy and should not be closed.            // See https://github.com/netty/netty/issues/5125            if (eventLoop != this || eventLoop == null) {                return;            }            // close the channel if the key is not valid anymore            unsafe.close(unsafe.voidPromise());            return;        }
        try {            int readyOps = k.readyOps();            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise            // the NIO JDK channel implementation may throw a NotYetConnectedException.            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking                // See https://github.com/netty/netty/issues/924                int ops = k.interestOps();                ops &= ~SelectionKey.OP_CONNECT;                k.interestOps(ops);
                unsafe.finishConnect();            }
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.            if ((readyOps & SelectionKey.OP_WRITE) != 0) {                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write                ch.unsafe().forceFlush();            }
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead            // to a spin loop            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {                //进入的是                unsafe.read();            }        } catch (CancelledKeyException ignored) {            unsafe.close(unsafe.voidPromise());        }    }

这里我们再回过头看下NioSocketChannel的构造:

 public NioSocketChannel(Channel parent, SocketChannel socket) {        super(parent, socket);        config = new NioSocketChannelConfig(this, socket.socket());    }
  protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {          //读的          super(parent, ch, SelectionKey.OP_READ);      }  protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {          super(parent);          this.ch = ch;          this.readInterestOp = readInterestOp;          try {              ch.configureBlocking(false);          } catch (IOException e) {              try {                  ch.close();              } catch (IOException e2) {                  if (logger.isWarnEnabled()) {                      logger.warn(                              "Failed to close a partially initialized socket.", e2);                  }              }
              throw new ChannelException("Failed to enter non-blocking mode.", e);          }      }      

可以看出SocketChannel的readInterestOp值为SelectionKey.OPREAD。当有OPREAD事件时会进入unsafe.read(),对于SocketChannel来说对应的unsafe是io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe。

分析

分析到这里,我们就不得不把nio的demo拿过来比较一下了:

  while(selector.select() > 0){     Set<SelectionKey> keySet = selector.selectedKeys();     Iterator<SelectionKey> it = keySet.iterator();     SelectionKey key = null;     while(it.hasNext()){
         key = it.next();         /*防止下次select方法返回已处理过的通道*/         it.remove();
         /*若发现异常,说明客户端连接出现问题,但服务器要保持正常*/         try{             /*ssc通道只能对链接事件感兴趣*/             if(key.isAcceptable()){
                 /*accept方法会返回一个普通通道,                      每个通道在内核中都对应一个socket缓冲区*/                 SocketChannel sc = ssc.accept();                 sc.configureBlocking(false);

当selector执行一次select()之后,如果selector.select()>0那么就可以通过selector.selectedKeys()获取到keySet。然后对keySet进行处理。

在NioEventLoop的构造方法中会调用openSelector()对selector进行初始化,在openSelector中有这样一段代码:

io.netty.channel.nio.NioEventLoop#openSelector:
 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {            @Override            public Object run() {                try {                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);                    if (cause != null) {                        return cause;                    }                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);                    if (cause != null) {                        return cause;                    }
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);                    return null;                } catch (NoSuchFieldException e) {                    return e;                } catch (IllegalAccessException e) {                    return e;                }            }        });
  //是SelectedSelectionKeySet对象         selectedKeys = selectedKeySet;       

通过反射,将selector的selectedKeys属性和publicSelectedKeys属性修改为selectedKeySet(这样做的好处是用单数组代替hashset,简化结构的同时,线性查找selectkey),并且让selectedKeys = selectedKeySet。那也就是说在每次执行selector.select()之后,都可以通过selectedKeySet来取到selectedKeys,正如processSelectedKeysOptimized中做的那样。这样当有SelectionKey.OP_READ进入时,会 进入 unsafe.read()方法,而SocketChannel的unsafe对应的是io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe,它的read方法:

  @Override        public final void read() {            final ChannelConfig config = config();            final ChannelPipeline pipeline = pipeline();            final ByteBufAllocator allocator = config.getAllocator();            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();            allocHandle.reset(config);
            ByteBuf byteBuf = null;            boolean close = false;            try {                do {                    byteBuf = allocHandle.allocate(allocator);                    allocHandle.lastBytesRead(doReadBytes(byteBuf));                    if (allocHandle.lastBytesRead() <= 0) {                        // nothing was read. release the buffer.                        byteBuf.release();                        byteBuf = null;                        close = allocHandle.lastBytesRead() < 0;                        break;                    }
                    allocHandle.incMessagesRead(1);                    readPending = false;                    pipeline.fireChannelRead(byteBuf);                    byteBuf = null;                } while (allocHandle.continueReading());
                allocHandle.readComplete();                pipeline.fireChannelReadComplete();
                if (close) {                    closeOnRead(pipeline);                }            } catch (Throwable t) {                handleReadException(pipeline, byteBuf, t, close, allocHandle);            } finally {                // Check if there is a readPending which was not processed yet.                // This could be for two reasons:                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method                //                // See https://github.com/netty/netty/issues/2254                if (!readPending && !config.isAutoRead()) {                    removeReadOp();                }            }        }

这里关注下pipeline.fireChannelRead(byteBuf)方法,会进入到SocketChannel的pipeline,从head到tail节点执行一遍handler的channelRead操作。以我的WebSocketServerHandler为例:

  @Override    public void channelRead0(ChannelHandlerContext ctx, Object msg) {        if (msg instanceof FullHttpRequest) {            handleHttpRequest(ctx, (FullHttpRequest) msg);        } else if (msg instanceof WebSocketFrame) {            handleWebSocketFrame(ctx, (WebSocketFrame) msg);        }    }
    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();    }
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
            // Check for closing frame            if (frame instanceof CloseWebSocketFrame) {                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());                return;            }            if (frame instanceof PingWebSocketFrame) {                ctx.write(new PongWebSocketFrame(frame.content().retain()));                return;            }            if (frame instanceof TextWebSocketFrame) {                // Echo the frame                ctx.write(frame.retain());                return;            }            if (frame instanceof BinaryWebSocketFrame) {                // Echo the frame                ctx.write(frame.retain());            }        }

在建立连接后,如果客户端发送了一条消息,会进入到pipeline.fireChannelRead(byteBuf)方法,最终执行到WebSocketServerHandler的channelRead0方法,如果是TextWebSocketFrame, 会使用ctx.write(frame.retain())向客户端写一条数据,这时最终会调用到DefaultPipline的write方法:

  @Override        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {            unsafe.write(msg, promise);        }

最后还是通过unsafe进行write操作的,io.netty.channel.AbstractChannel.AbstractUnsafe#write:

 @Overridepublic final void write(Object msg, ChannelPromise promise) {    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null) {        // If the outboundBuffer is null we know the channel was closed and so        // need to fail the future right away. If it is not null the handling of the rest        // will be done in flush0()        // See https://github.com/netty/netty/issues/2362        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);        // release message now to prevent resource-leak        ReferenceCountUtil.release(msg);        return;    }
    int size;    try {        msg = filterOutboundMessage(msg);        size = pipeline.estimatorHandle().size(msg);        if (size < 0) {            size = 0;        }    } catch (Throwable t) {        safeSetFailure(promise, t);        ReferenceCountUtil.release(msg);        return;    }
    outboundBuffer.addMessage(msg, size, promise);}

你会发现这里只是将要发送的数据放入了outboundBuffer中,并没有往外发送,这是因为在io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe的read方法中:

 do {    byteBuf = allocHandle.allocate(allocator);    allocHandle.lastBytesRead(doReadBytes(byteBuf));    if (allocHandle.lastBytesRead() <= 0) {        // nothing was read. release the buffer.        byteBuf.release();        byteBuf = null;        close = allocHandle.lastBytesRead() < 0;        break;    }
    allocHandle.incMessagesRead(1);    readPending = false;    pipeline.fireChannelRead(byteBuf);    byteBuf = null;} while (allocHandle.continueReading());
allocHandle.readComplete();pipeline.fireChannelReadComplete();

在pipeline.fireChannelRead(byteBuf)执行完成之后,还会执行pipeline.fireChannelReadComplete(),这个方法也是从pipeline的head到tail执行,最终会执行WebSocketServerHandler的 channelReadComplete方法:

 @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();    }

重点在于ctx.flush()方法,它最终会调用io.netty.channel.DefaultChannelPipeline.HeadContext#flush方法:

 @Overridepublic void flush(ChannelHandlerContext ctx) throws Exception {    unsafe.flush();}

io.netty.channel.AbstractChannel.AbstractUnsafe#flush方法:

 @Overridepublic final void flush() {    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null) {        return;    }
    outboundBuffer.addFlush();    flush0();}

从buffer中取出数据进行flush。

先调用io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#flush0:

 @Overrideprotected final void flush0() {    // Flush immediately only when there's no pending flush.    // If there's a pending flush operation, event loop will call forceFlush() later,    // and thus there's no need to call it now.    if (isFlushPending()) {        return;    }    super.flush0();}
 private boolean isFlushPending() {    SelectionKey selectionKey = selectionKey();    return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;}

这里selectionKey.interestOps() & SelectionKey.OPWRITE中selectionKey.interestOps()值为1,SelectionKey.OPWRITE值为4,那么(selectionKey.interestOps() & SelectionKey.OP_WRITE)的值为0。

也就是说如果注册了selectionKey.interestOps为SelectionKey.OP_WRITE,那么将不会在这里进行flush,eventLoop会调用forceFlush来刷新。io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)方法:

if ((readyOps & SelectionKey.OP_WRITE) != 0) {    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write    ch.unsafe().forceFlush();}

然后调用super.flush0() io.netty.channel.AbstractChannel.AbstractUnsafe#flush0:

  protected void flush0() {    if (inFlush0) {        // Avoid re-entrance        return;    }
    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null || outboundBuffer.isEmpty()) {        return;    }
    inFlush0 = true;
    // Mark all pending write requests as failure if the channel is inactive.    if (!isActive()) {        try {            if (isOpen()) {                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);            } else {                // Do not trigger channelWritabilityChanged because the channel is closed already.                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);            }        } finally {            inFlush0 = false;        }        return;    }
    try {        doWrite(outboundBuffer);    } catch (Throwable t) {

这里doWrite方法是关键,我们跟进去io.netty.channel.socket.nio.NioSocketChannel#doWrite:

 @Override    protected void doWrite(ChannelOutboundBuffer in) throws Exception {        for (;;) {            int size = in.size();            if (size == 0) {                // All written so clear OP_WRITE                clearOpWrite();                break;            }            long writtenBytes = 0;            boolean done = false;            boolean setOpWrite = false;
            // Ensure the pending writes are made of ByteBufs only.            ByteBuffer[] nioBuffers = in.nioBuffers();            int nioBufferCnt = in.nioBufferCount();            long expectedWrittenBytes = in.nioBufferSize();            SocketChannel ch = javaChannel();
            // Always us nioBuffers() to workaround data-corruption.            // See https://github.com/netty/netty/issues/2761            switch (nioBufferCnt) {                case 0:                    // We have something else beside ByteBuffers to write so fallback to normal writes.                    super.doWrite(in);                    return;                case 1:                    // Only one ByteBuf so use non-gathering write                    ByteBuffer nioBuffer = nioBuffers[0];                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {                        final int localWrittenBytes = ch.write(nioBuffer);                        if (localWrittenBytes == 0) {                            setOpWrite = true;                            break;                        }                        expectedWrittenBytes -= localWrittenBytes;                        writtenBytes += localWrittenBytes;                        if (expectedWrittenBytes == 0) {                            done = true;                            break;                        }                    }                    break;                default:                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);                        if (localWrittenBytes == 0) {                            setOpWrite = true;                            break;                        }                        expectedWrittenBytes -= localWrittenBytes;                        writtenBytes += localWrittenBytes;                        if (expectedWrittenBytes == 0) {                            done = true;                            break;                        }                    }                    break;            }
            // Release the fully written buffers, and update the indexes of the partially written buffer.            in.removeBytes(writtenBytes);
            if (!done) {                // Did not write all buffers completely.                //如果数据量很大,则去注册OP_WRITE事件                incompleteWrite(setOpWrite);                break;            }        }    }

最终调用的是ch.write进行写出。这里可能有个疑问,为啥正常写没有注册OPWRITE事件? 认为网络不出现阻塞情况下,一直是可以写的,一般我们不注册OPWRITE事件.一般在当Socket的缓冲区繁忙、网络因素等不可写的时候才会注册OP_WRITE事件。

总结

  • 大致流程是reactor线程轮询到有ACCEPT事件后创建NioSocketChannel,然后进入pipeline,在ServerBootStrapAcceptor的channelRead事件中进行 NioSocketChannel的pipeline的添加(添加的是用户在bootstrap层传入的childHandler),然后使用workerGroup进行NioSocketChannel的注册。 注册的核心是围绕着NioEventLoop进行的,它在初始化workerGroup对应的NioEventLoopGroup时创建的了ThreadPerTaskExecutor作为该group的executor,另一方面创建了 cpu核数*2个NioEventLoop,在通过eventloop执行NioSocketChannel的注册时,使用chooser来随机选择一个eventloop来完成,会先确保NioEventLoop都已经启动,具体方式是通过executor调用eventLoop的run方法, run方法是一个死循环,会不停地进行selector的select操作,监听与该EventLoop的selector绑定的channel上的事件。另一方面NioSocketchannel的注册工作会封装成task会被eventLoop的run方法中执行,注册时与nioEventLoop中的selector属性完成了绑定。 写的操作也是通过NioSocketChannel的unsafe类是通过NioSocketChannel的doWrite方法来进行的。

本文分享自微信公众号 - 开发架构二三事(gh_d6f166e26398),作者:两个小灰象

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-22

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Elasticsearch源码分析八之锁管理工具KeyLock

    KeyedLock的map属性是存放资源标识和KeyLock的容器,也就是一个大的锁容器。KeyLock为每一个资源标识对应的锁对象,它继承自Reentrant...

    开发架构二三事
  • 关于netty你需要了解的二三事.md

    在nio编程中,select和bind可以不按顺序调用,也可以不在同一个线程中。netty中这是在boss线程中做的事情,可能会出现先select再绑定端口的情...

    开发架构二三事
  • java8中skiplist的实现及源码分析

    这个类是实现了一个类似于树的二维连接的跳表,它的索引级别是放在分割开的节点里面的,基础节点拥有所有的数据。用这个便利的数据结构代替数组结构的原因主要有两点:

    开发架构二三事
  • PAT 1012 The Best Rank

    1012. The Best Rank (25) 时间限制 400 ms 内存限制 65536 kB 代码长度限制 16000 B ...

    ShenduCC
  • 百万级高并发mongodb集群性能数十倍提升优化实践(上篇)

    线上某集群峰值TPS超过100万/秒左右(主要为写流量,读流量很低),峰值tps几乎已经到达集群上限,同时平均时延也超过100ms,随着读写流量的进一步增加,时...

    美的让人心动
  • LinkedList源码分析(基于Java8)内部结构构造方法添加2检索3删除4迭代器5 例子6总结

    JavaEdge
  • 1-6 银行业务队列简单模拟 (25 分)

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    韩旭051
  • 源码分析Dubbo事件派发机制

    从官网我们得知,Dubbo协议是使用单一长连接来进行网络传输,也就是说服务调用方持久与服务提供者建立一条连接,所有的服务调用调用信息通过。 一条TCP连接进行传...

    丁威
  • C++核心准则C.181:避免使用"暴露的"联合体

    A naked union is a union without an associated indicator which member (if any) i...

    面向对象思考
  • 移动安全-应用加固命令行工具jar包使用说明

    为方便用户快速集成到自动化任务中,乐固加固推出命令行工具。该工具是一个jar包(支持基础版、专业版、企业版),提供上传apk、下载加固包功能。下载的加固包需要用...

    腾讯云@移动安全

扫码关注云+社区

领取腾讯云代金券