前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty学习三

Netty学习三

作者头像
路行的亚洲
发布2020-07-16 15:56:47
5390
发布2020-07-16 15:56:47
举报
文章被收录于专栏:后端技术学习后端技术学习

Netty学习三

前面我们已经知道Netty服务端启动的时候最重要的是进行bind操作,这个操作不仅进行了run()操作进行死循环,而且将线程任务添加到队列中,进行runAllTasks操作。

首先,我们可以看Netty的架构图,图片来自即时通讯网:

reactor线程模型图,图片来自即时通讯网:

下面是跟踪源码的流程操作:

代码语言:javascript
复制
 AbstractBootstrap#bind(int inetPort)-> AbstractBootstrap# bind(SocketAddress localAddress)-> 
 AbstractBootstrap#doBind(final SocketAddress localAddress)->AbstractBootstrap#doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) 注意这个方法,里面先进行绑定,然后添加监听->
              ChannelOutboundInvoker#bind(SocketAddress localAddress, ChannelPromise promise)-> 
             重要 AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)->
              DefaultChannelPipeline#bind(SocketAddress localAddress, ChannelPromise promise)->
              AbstractChannelHandlerContext#bind(final SocketAddress localAddress, final ChannelPromise promise)->
            AbstractChannelHandlerContext#invokeBind(SocketAddress localAddress, ChannelPromise promise)
              ——>ChannelOutboundInvoker#bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)      
进入LoggingHandler的操作
          ->重要,调用LoggingHandler#bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)#ctx.bind(localAddress, promise)
          -> AbstractChannelHandlerContext#bind(final SocketAddress localAddress, final ChannelPromise promise)#next.invokeBind(localAddress, promise)
          ->AbstractChannelHandlerContext#invokeBind(SocketAddress localAddress, ChannelPromise promise)#bind(this, localAddress, promise)
          ->ChannelOutboundInvoker#bind(SocketAddress localAddress, ChannelPromise promise)
          ->重要 DefaultChannelPipeline#bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)#unsafe.bind(localAddress, promise)
          -> 重要 AbstractChannel#bind(final SocketAddress localAddress, final ChannelPromise promise)

          ->javaChannel()->NioServerSocketChannel#doBind(SocketAddress localAddress)

          ->AbstractChannel#invokeLater(Runnable task)# eventLoop().execute(task)
          ->SingleThreadEventExecutor#execute(Runnable task)
          #execute(Runnable task, boolean immediate)#addTask(Runnable task)#wakeup(inEventLoop)#safeSetSuccess(promise)
          ->AbstractChannel#safeSetSuccess(ChannelPromise promise)
          ->此时会将结果往回抛AbstractChannelHandlerContext->LoggingHandler#bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) 
          ->DefaultChannelPromise#addListener(GenericFutureListener<? extends Future<? super Void>> listener)
          ->DefaultPromise#addListener(GenericFutureListener<? extends Future<? super V>> listener)#isDone()#notifyListeners()
          #executor()
          ->AbstractBootstrap#executor()#super.executor()
          ->DefaultChannelPromise#channel().eventLoop()
          ->AbstractNioChannel#eventLoop()
          ->AbstractChannel#eventLoop()
          ->notifyListenersNow()
          ->FastThreadLocalRunnable#run()
          ->ThreadExecutorMap#apply(final Runnable command, final EventExecutor eventExecutor)#command.run()
          ->SingleThreadEventExecutor#doStartThread()#SingleThreadEventExecutor.this.run()
          ->重要 NioEventLoop#run()#runAllTasks(long timeoutNanos)#runTasks ++#task = pollTask()#pollTaskFrom(Queue<Runnable> taskQueue)
          ->NioEventLoop#safeExecute(Runnable task)#fireChannelActive() 重要#invokeChannelActive()的((ChannelInboundHandler) handler()).channelActive(this);

跟踪完之后,回忆一下,Netty的操作,首先启动服务,此时已经启动了服务,还需要建立连接。

进行绑定操作的重要步骤:

代码语言:javascript
复制
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();  
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
            "A non-root user can't receive a broadcast packet if the socket " +
            "is not bound to a wildcard address; binding to a non-wildcard " +
            "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                //重要,进行active操作
                pipeline.fireChannelActive();
            }
        });
    }
    //关键地方,此时会一步一步的返回pipeline操作
    safeSetSuccess(promise);
}

此时我们可以看到EventLoop是一个重要的类,我们的大部分操作都是在NioEventLoop中完成的操作。EventLoop的作用是一个死循环,而这个循环做了下面几件事:

有条件的等待Nio事件

处理Nio事件

处理消息队列中的任务

代码语言:javascript
复制
@Override
    protected void run() {
        int selectCnt = 0;
        //自旋操作
        for (;;) {
            try {
                int strategy;
                try {
                    //计算策略
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    //匹配策略
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        //运行task 重要
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }


 //运行AllTasks操作
 protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

接收请求,进行多路复用的key

代码语言:javascript
复制
NioEventLoop#processSelectedKey(SelectionKey k, AbstractNioChannel ch)#unsafe.read()->AbstractNioMessageChannel#read()
-> NioServerSocketChannel#doReadMessages(List<Object> buf)#
SocketChannel ch = SocketUtils.accept(javaChannel())#SocketUtils#accept(final ServerSocketChannel serverSocketChannel)#
ServerSocketChannel#accept()->sun公司的ServerSocketChannelImpl#accept(),此方法完成连接操作
->关注doReadMessages(List<Object> buf)中的pipeline.fireChannelRead(readBuf.get(i))方法,此方法完成了read操作

最终,会调用到AbstractNioChannel的doBeginRead方法。

下面我们来看一下,首先我们在网页中输入http://localhost:8007/,会看到客户端发出请求,断掉会进入到unsafe.read()操作:

代码语言:javascript
复制
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {
        int readyOps = k.readyOps();
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            //重要
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

进入到,这个方法是重要的:

代码语言:javascript
复制
private final class NioMessageUnsafe extends AbstractNioUnsafe {

    private final List<Object> readBuf = new ArrayList<Object>();

    @Override
    public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);

        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }

                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }

            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if (exception != null) {
                closed = closeOnReadError(exception);

                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

进而进入:

代码语言:javascript
复制
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

  public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }


  public SocketChannel accept() throws IOException {
        Object var1 = this.lock;
        synchronized(this.lock) {
            if (!this.isOpen()) {
                throw new ClosedChannelException();
            } else if (!this.isBound()) {
                throw new NotYetBoundException();
            } else {
                SocketChannelImpl var2 = null;
                int var3 = 0;
                FileDescriptor var4 = new FileDescriptor();
                InetSocketAddress[] var5 = new InetSocketAddress[1];

                InetSocketAddress var6;
                try {
                    this.begin();
                    if (!this.isOpen()) {
                        var6 = null;
                        return var6;
                    }

                    this.thread = NativeThread.current();

                    do {
                        var3 = this.accept(this.fd, var4, var5);
                    } while(var3 == -3 && this.isOpen());
                } finally {
                    this.thread = 0L;
                    this.end(var3 > 0);

                    assert IOStatus.check(var3);

                }

                if (var3 < 1) {
                    return null;
                } else {
                    IOUtil.configureBlocking(var4, true);
                    var6 = var5[0];
                    var2 = new SocketChannelImpl(this.provider(), var4, var6);
                    SecurityManager var7 = System.getSecurityManager();
                    if (var7 != null) {
                        try {
                            var7.checkAccept(var6.getAddress().getHostAddress(), var6.getPort());
                        } catch (SecurityException var13) {
                            var2.close();
                            throw var13;
                        }
                    }

                    return var2;
                }
            }
        }
    }

此时会调用jdk的SocketChannel,最终调用sun公司的ServerSocketChannel的accept操作。

接着来分析

代码语言:javascript
复制
 for (int i = 0; i < size; i ++) {
                readPending = false;
                pipeline.fireChannelRead(readBuf.get(i));
 }

会调用:

代码语言:javascript
复制
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

 private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

调用ChannelRead:

代码语言:javascript
复制
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        //执行register操作
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

进入AbstactChannel的register:

代码语言:javascript
复制
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

进而进入我们需要看的doBeginRead:

代码语言:javascript
复制
private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        //进行注册
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                //重要
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

进入doBeginRead:

代码语言:javascript
复制
@Override
public final void beginRead() {
    assertEventLoop();

    if (!isActive()) {
        return;
    }

    try {
        doBeginRead();
    } catch (final Exception e) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}

进入doBeginRead方法:

代码语言:javascript
复制
@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        //完成感兴趣操作
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

可以在jdk的SelectionKeyImpl看到:

代码语言:javascript
复制
public SelectionKey interestOps(int var1) {
    this.ensureValid();
    return this.nioInterestOps(var1);
}

此时完成了doBeginRead操作。

也即首先启动关注bind操作,完成启动之后,进行accept操作,然后进行read操作。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
即时通信 IM
即时通信 IM(Instant Messaging)基于腾讯二十余年的 IM 技术积累,支持Android、iOS、Mac、Windows、Web、H5、小程序平台且跨终端互通,低代码 UI 组件助您30分钟集成单聊、群聊、关系链、消息漫游、群组管理、资料管理、直播弹幕和内容审核等能力。适用于直播互动、电商带货、客服咨询、社交沟通、在线课程、企业办公、互动游戏、医疗健康等场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档