Netty学习三

Netty学习三

前面我们已经知道Netty服务端启动的时候最重要的是进行bind操作,这个操作不仅进行了run()操作进行死循环,而且将线程任务添加到队列中,进行runAllTasks操作。

首先,我们可以看Netty的架构图,图片来自即时通讯网:

reactor线程模型图,图片来自即时通讯网:

下面是跟踪源码的流程操作:

 AbstractBootstrap#bind(int inetPort)-> AbstractBootstrap# bind(SocketAddress localAddress)-> 
 AbstractBootstrap#doBind(final SocketAddress localAddress)->AbstractBootstrap#doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) 注意这个方法,里面先进行绑定,然后添加监听->
              ChannelOutboundInvoker#bind(SocketAddress localAddress, ChannelPromise promise)-> 
             重要 AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)->
              DefaultChannelPipeline#bind(SocketAddress localAddress, ChannelPromise promise)->
              AbstractChannelHandlerContext#bind(final SocketAddress localAddress, final ChannelPromise promise)->
            AbstractChannelHandlerContext#invokeBind(SocketAddress localAddress, ChannelPromise promise)
              ——>ChannelOutboundInvoker#bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)      
进入LoggingHandler的操作
          ->重要,调用LoggingHandler#bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)#ctx.bind(localAddress, promise)
          -> AbstractChannelHandlerContext#bind(final SocketAddress localAddress, final ChannelPromise promise)#next.invokeBind(localAddress, promise)
          ->AbstractChannelHandlerContext#invokeBind(SocketAddress localAddress, ChannelPromise promise)#bind(this, localAddress, promise)
          ->ChannelOutboundInvoker#bind(SocketAddress localAddress, ChannelPromise promise)
          ->重要 DefaultChannelPipeline#bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)#unsafe.bind(localAddress, promise)
          -> 重要 AbstractChannel#bind(final SocketAddress localAddress, final ChannelPromise promise)

          ->javaChannel()->NioServerSocketChannel#doBind(SocketAddress localAddress)

          ->AbstractChannel#invokeLater(Runnable task)# eventLoop().execute(task)
          ->SingleThreadEventExecutor#execute(Runnable task)
          #execute(Runnable task, boolean immediate)#addTask(Runnable task)#wakeup(inEventLoop)#safeSetSuccess(promise)
          ->AbstractChannel#safeSetSuccess(ChannelPromise promise)
          ->此时会将结果往回抛AbstractChannelHandlerContext->LoggingHandler#bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) 
          ->DefaultChannelPromise#addListener(GenericFutureListener<? extends Future<? super Void>> listener)
          ->DefaultPromise#addListener(GenericFutureListener<? extends Future<? super V>> listener)#isDone()#notifyListeners()
          #executor()
          ->AbstractBootstrap#executor()#super.executor()
          ->DefaultChannelPromise#channel().eventLoop()
          ->AbstractNioChannel#eventLoop()
          ->AbstractChannel#eventLoop()
          ->notifyListenersNow()
          ->FastThreadLocalRunnable#run()
          ->ThreadExecutorMap#apply(final Runnable command, final EventExecutor eventExecutor)#command.run()
          ->SingleThreadEventExecutor#doStartThread()#SingleThreadEventExecutor.this.run()
          ->重要 NioEventLoop#run()#runAllTasks(long timeoutNanos)#runTasks ++#task = pollTask()#pollTaskFrom(Queue<Runnable> taskQueue)
          ->NioEventLoop#safeExecute(Runnable task)#fireChannelActive() 重要#invokeChannelActive()的((ChannelInboundHandler) handler()).channelActive(this);

跟踪完之后,回忆一下,Netty的操作,首先启动服务,此时已经启动了服务,还需要建立连接。

进行绑定操作的重要步骤:

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();  
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
            "A non-root user can't receive a broadcast packet if the socket " +
            "is not bound to a wildcard address; binding to a non-wildcard " +
            "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                //重要,进行active操作
                pipeline.fireChannelActive();
            }
        });
    }
    //关键地方,此时会一步一步的返回pipeline操作
    safeSetSuccess(promise);
}

此时我们可以看到EventLoop是一个重要的类,我们的大部分操作都是在NioEventLoop中完成的操作。EventLoop的作用是一个死循环,而这个循环做了下面几件事:

有条件的等待Nio事件

处理Nio事件

处理消息队列中的任务

@Override
    protected void run() {
        int selectCnt = 0;
        //自旋操作
        for (;;) {
            try {
                int strategy;
                try {
                    //计算策略
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    //匹配策略
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        //运行task 重要
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }


 //运行AllTasks操作
 protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

接收请求,进行多路复用的key

NioEventLoop#processSelectedKey(SelectionKey k, AbstractNioChannel ch)#unsafe.read()->AbstractNioMessageChannel#read()
-> NioServerSocketChannel#doReadMessages(List<Object> buf)#
SocketChannel ch = SocketUtils.accept(javaChannel())#SocketUtils#accept(final ServerSocketChannel serverSocketChannel)#
ServerSocketChannel#accept()->sun公司的ServerSocketChannelImpl#accept(),此方法完成连接操作
->关注doReadMessages(List<Object> buf)中的pipeline.fireChannelRead(readBuf.get(i))方法,此方法完成了read操作

最终,会调用到AbstractNioChannel的doBeginRead方法。

下面我们来看一下,首先我们在网页中输入http://localhost:8007/,会看到客户端发出请求,断掉会进入到unsafe.read()操作:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {
        int readyOps = k.readyOps();
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            //重要
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

进入到,这个方法是重要的:

private final class NioMessageUnsafe extends AbstractNioUnsafe {

    private final List<Object> readBuf = new ArrayList<Object>();

    @Override
    public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);

        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }

                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }

            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if (exception != null) {
                closed = closeOnReadError(exception);

                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

进而进入:

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

  public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }


  public SocketChannel accept() throws IOException {
        Object var1 = this.lock;
        synchronized(this.lock) {
            if (!this.isOpen()) {
                throw new ClosedChannelException();
            } else if (!this.isBound()) {
                throw new NotYetBoundException();
            } else {
                SocketChannelImpl var2 = null;
                int var3 = 0;
                FileDescriptor var4 = new FileDescriptor();
                InetSocketAddress[] var5 = new InetSocketAddress[1];

                InetSocketAddress var6;
                try {
                    this.begin();
                    if (!this.isOpen()) {
                        var6 = null;
                        return var6;
                    }

                    this.thread = NativeThread.current();

                    do {
                        var3 = this.accept(this.fd, var4, var5);
                    } while(var3 == -3 && this.isOpen());
                } finally {
                    this.thread = 0L;
                    this.end(var3 > 0);

                    assert IOStatus.check(var3);

                }

                if (var3 < 1) {
                    return null;
                } else {
                    IOUtil.configureBlocking(var4, true);
                    var6 = var5[0];
                    var2 = new SocketChannelImpl(this.provider(), var4, var6);
                    SecurityManager var7 = System.getSecurityManager();
                    if (var7 != null) {
                        try {
                            var7.checkAccept(var6.getAddress().getHostAddress(), var6.getPort());
                        } catch (SecurityException var13) {
                            var2.close();
                            throw var13;
                        }
                    }

                    return var2;
                }
            }
        }
    }

此时会调用jdk的SocketChannel,最终调用sun公司的ServerSocketChannel的accept操作。

接着来分析

 for (int i = 0; i < size; i ++) {
                readPending = false;
                pipeline.fireChannelRead(readBuf.get(i));
 }

会调用:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

 private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

调用ChannelRead:

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        //执行register操作
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

进入AbstactChannel的register:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

进而进入我们需要看的doBeginRead:

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        //进行注册
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                //重要
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

进入doBeginRead:

@Override
public final void beginRead() {
    assertEventLoop();

    if (!isActive()) {
        return;
    }

    try {
        doBeginRead();
    } catch (final Exception e) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}

进入doBeginRead方法:

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        //完成感兴趣操作
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

可以在jdk的SelectionKeyImpl看到:

public SelectionKey interestOps(int var1) {
    this.ensureValid();
    return this.nioInterestOps(var1);
}

此时完成了doBeginRead操作。

也即首先启动关注bind操作,完成启动之后,进行accept操作,然后进行read操作。

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-07-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • LinkedBlockingQueue源码学习

    采用线程池和阻塞队列实现生产/消费者模型。其中LinkedBlockingQueue是阻塞队列,同时线程安全,其特点:

    路行的亚洲
  • ConcurrentHashMap源码学习

    既然有了HashMap为什么还会出现ConcurrentHashMap?同时ConcurrentHashMap具有什么优势?ConcurrentHashMap与...

    路行的亚洲
  • RocketMQ学习5

    进行消息发送的过程首先会准备好路由信息,最终是由netty完成的,也即使用nettyRemotingClient来实现的。

    路行的亚洲
  • 8. SOFAJRaft源码分析— JRaft是如何实现日志复制的?

    前几天和腾讯的大佬一起吃饭聊天,说起我对SOFAJRaft的理解,我自然以为我是很懂了的,但是大佬问起了我那SOFAJRaft集群之间的日志是怎么复制的? 我...

    luozhiyun
  • HttpComponents HttpClient连接池(2)-连接的申请

    在上一篇文章里我们主要介绍了 httpclient 连接池的关键类和数据结构,在这里我们主要介绍http连接的申请和释放。

    TA码字
  • Spring Boot中通过CORS解决跨域问题

    很多人对跨域有一种误解,以为这是前端的事,和后端没关系,其实不是这样的,说到跨域,就不得不说说浏览器的同源策略。 同源策略是由Netscape提出的一个著名的安...

    江南一点雨
  • 19届前端实习生面经

    牛客网
  • Omega系统简介

    1.背景 Google的论文Omega:flexible,scalable schedulers for large compute clusters中把调度分...

    大数据和云计算技术
  • SQL基础日期函数

    用户1112962
  • GPU高性能编程CUDA实战(二)

    在上一篇文章中:CUDA8.0+VS2015+Win10开发环境搭建教程中已经介绍了CUDA工程的配置与安装。本篇文章是对CUDA工程的配置作进一步介绍与补充说...

    3D视觉工坊

扫码关注云+社区

领取腾讯云代金券