前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty Review - 服务端channel注册流程源码解析

Netty Review - 服务端channel注册流程源码解析

作者头像
小小工匠
发布2024-05-26 12:18:42
1410
发布2024-05-26 12:18:42
举报
文章被收录于专栏:小工匠聊架构

Pre

Netty Review - ServerBootstrap源码解析

Netty Review - NioServerSocketChannel源码分析


Netty主从Reactor线程模型

Netty 使用主从 Reactor 线程模型来处理并发连接和网络事件。

在 Netty 中,通常有两种类型的线程池:

  1. Boss 线程池:用于接受客户端连接请求,并将接受到的连接注册到 Worker 线程池的 EventLoop 中。Boss 线程池中的线程负责监听 ServerSocketChannel,并将接受到的连接分配给 Worker 线程池中的某个 EventLoop 处理。
  2. Worker 线程池:每个 Worker 线程池包含多个 EventLoop,每个 EventLoop 负责处理一组连接的读写和事件处理。当一个连接被注册到某个 Worker 线程池的 EventLoop 中时,该 EventLoop 将负责处理这个连接的所有事件,包括读取数据、写入数据、处理网络事件等。

主从 Reactor 线程模型的工作流程如下:

  1. 主线程池(Boss 线程池)负责监听 ServerSocketChannel 上的连接请求,并将接受到的连接请求分配给 Worker 线程池中的某个 EventLoop。
  2. Worker 线程池中的每个 EventLoop 都独立负责一组连接的读写和事件处理。当一个连接被注册到某个 EventLoop 上时,该 EventLoop 将会不断地轮询连接上是否有可读事件或可写事件,并在事件发生时进行相应的处理。
  3. 当有读写事件发生时,EventLoop 将调用对应的 ChannelHandler 进行处理。这些 ChannelHandler 可以进行数据解析、业务逻辑处理等操作。
  4. 处理完事件后,EventLoop 可能会将结果写回到连接中,或者关闭连接等。

通过主从 Reactor 线程模型,Netty 可以高效地处理大量的并发连接和网络事件,提高了网络应用程序的性能和可扩展性。


服务端channel注册流程

在Netty中,服务端Channel注册流程涉及以下几个关键步骤:

  1. 创建ServerBootstrap实例: 首先,需要创建一个ServerBootstrap实例,它是Netty提供的用于启动服务端的引导类。
  2. 配置ServerBootstrap: 使用ServerBootstrap实例,设置一系列参数,包括线程模型、Channel类型、处理器等。
  3. 绑定端口并启动服务: 调用ServerBootstrap的bind方法,指定端口并启动服务端。在bind方法内部,会进行以下操作:
    • 创建NioServerSocketChannel实例:用于表示服务端的Channel,内部封装了Java NIO中的ServerSocketChannel。
    • 初始化ChannelPipeline:为NioServerSocketChannel实例创建一个ChannelPipeline对象,用于管理ChannelHandler链。
    • 创建ChannelInitializer并添加到ChannelPipeline:ChannelInitializer是一个特殊的ChannelHandler,它用于在Channel注册到EventLoop之后初始化ChannelPipeline。在ChannelInitializer的initChannel方法中,可以向ChannelPipeline中添加自定义的ChannelHandler。
    • 获取EventLoopGroup并注册Channel:从ServerBootstrap中获取Boss EventLoopGroup,然后调用其register方法注册NioServerSocketChannel到EventLoop上。
  4. 注册Channel到EventLoop: 在调用register方法时,会将NioServerSocketChannel注册到Boss EventLoop上。在注册过程中,会执行以下操作:
    • 获取EventLoop:根据配置,从Boss EventLoopGroup中选择一个EventLoop。
    • 调用EventLoop的register方法:将NioServerSocketChannel注册到选定的EventLoop上。注册过程中,会创建一个NioServerSocketChannelUnsafe实例来处理注册过程,其中会调用底层的Java NIO方法将ServerSocketChannel注册到Selector上,并监听ACCEPT事件。
  5. 事件处理: 一旦NioServerSocketChannel注册到了EventLoop上,就会开始监听ACCEPT事件。当有新的连接接入时,会触发ACCEPT事件,EventLoop会调用相关的ChannelHandler进行处理,如调用ChannelInitializer的initChannel方法,添加用户自定义的ChannelHandler到新的连接的ChannelPipeline中。接着,新的连接就可以接受和处理客户端的请求了。

通过以上流程,服务端Channel在Netty中的注册过程就完成了,它可以接受客户端的连接,并将连接注册到EventLoop上进行事件处理。


源码解读

当我们梳理完

Netty Review - ServerBootstrap源码解析

Netty Review - NioServerSocketChannel源码分析

接下来让我们从下面这一行代码开始

代码语言:javascript
复制
ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
channelFuture.channel().closeFuture().sync();

这段代码用于启动服务端并阻塞当前线程直到服务端关闭。

  1. serverBootstrap.bind(9000):调用serverBootstrapbind()方法绑定端口9000,并返回一个ChannelFuture对象,表示绑定操作的异步结果。
  2. .sync():调用sync()方法阻塞当前线程,直到绑定操作完成。这样做是为了确保服务端在端口绑定完成后再继续执行后续代码。
  3. channelFuture.channel().closeFuture().sync():获取channelFuture中的channel(),然后调用其closeFuture()方法获取一个表示关闭操作的ChannelFuture对象。接着,再次调用sync()方法阻塞当前线程,直到关闭操作完成。这样做是为了让当前线程一直等待直到服务端关闭。

入口 serverBootstrap.bind(port)

这段代码是bind(int inetPort)方法的实现。

代码语言:javascript
复制
/**
 * Create a new {@link Channel} and bind it.
 */
public ChannelFuture bind(int inetPort) {
    // 调用bind方法,传入一个InetSocketAddress对象,该对象使用指定的端口号创建
    return bind(new InetSocketAddress(inetPort));
}

创建一个新的Channel并绑定到指定的端口。


doBind(final SocketAddress localAddress)

这段代码是doBind(final SocketAddress localAddress)方法的实现。

代码语言:javascript
复制
private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化并注册Channel,并返回一个ChannelFuture
    final ChannelFuture regFuture = initAndRegister();
    // 获取注册完成的Channel
    final Channel channel = regFuture.channel();
    // 如果注册过程中发生异常,则直接返回注册的ChannelFuture
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // 如果注册已经完成,则创建一个新的ChannelPromise,并执行绑定操作
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // 如果注册尚未完成,则创建一个PendingRegistrationPromise,并添加一个监听器等待注册完成后再执行绑定操作
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // 如果注册过程中发生异常,则直接设置失败状态
                    promise.setFailure(cause);
                } else {
                    // 注册成功后执行绑定操作
                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

这段代码的作用是执行绑定操作,并返回一个与绑定操作相关的ChannelFuture。


initAndRegister()

代码语言:javascript
复制
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 使用channelFactory创建一个新的Channel实例
        channel = channelFactory.newChannel();
        // 对新创建的Channel进行初始化
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // 如果初始化过程中发生异常,关闭Channel
            channel.unsafe().closeForcibly();
            // 创建一个新的ChannelPromise,并设置失败状态
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // 如果channel为null,则创建一个FailedChannel实例,并设置失败状态
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 使用ChannelConfig的EventLoopGroup进行注册
    ChannelFuture regFuture = config().group().register(channel);
    // 如果注册过程中发生异常,则关闭Channel
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // 返回注册的ChannelFuture
    return regFuture;
}

创建一个新的Channel实例并对其进行初始化,然后使用EventLoopGroup将其注册到事件循环中。最后返回一个与注册操作相关的ChannelFuture。


channelFactory.newChannel()

channelFactory.newChannel() 中的实现,请移步 Netty Review - NioServerSocketChannel源码分析

init(channel)

代码语言:javascript
复制
@Override
void init(Channel channel) throws Exception {
    // 设置Channel的选项
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    // 设置Channel的属性
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    // 获取Channel的Pipeline
    ChannelPipeline p = channel.pipeline();

    // 复制当前的子组、子处理器、子选项和子属性
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    // 向Channel的Pipeline中添加一个ChannelInitializer
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                // 添加用户配置的处理器到Pipeline中
                pipeline.addLast(handler);
            }

            // 在Channel的事件循环中执行
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 添加一个ServerBootstrapAcceptor到Pipeline中,用于接收新连接
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

init()方法的作用是初始化Channel,设置Channel的选项和属性,然后向ChannelPipeline中添加一个ChannelInitializer,该InitializerChannel的事件循环中执行,并向Pipeline中添加一个ServerBootstrapAcceptor,用于接收新连接。


config().group().register(channel)

代码语言:javascript
复制
    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

next()

代码语言:javascript
复制
io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser#next
代码语言:javascript
复制
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    // 使用原子整数来维护索引,以确保在多线程环境中安全地获取下一个 EventExecutor 实例。
    private final AtomicInteger idx = new AtomicInteger();
    // 存储所有可用的 EventExecutor 实例的数组。
    private final EventExecutor[] executors;

    // 构造方法,接收一个 EventExecutor 实例数组作为参数,并将其存储在 executors 成员变量中。
    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    // 选择下一个要使用的 EventExecutor 实例。
    // 通过对索引进行按位与操作(idx.getAndIncrement() & executors.length - 1),
    // 来确保索引始终在 executors 数组的有效范围内。
    // 由于 executors.length 必须是 2 的幂次方,因此使用按位与运算(&)可以有效地实现取模操作,
    // 从而将索引限制在数组长度范围内。
    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

代码语言:javascript
复制
@Override
public ChannelFuture register(final ChannelPromise promise) {
    // 检查传入的 promise 参数是否为 null,如果为 null,则抛出 NullPointerException 异常。
    ObjectUtil.checkNotNull(promise, "promise");
    // 获取与该 promise 关联的 Channel 实例,通过 unsafe() 方法获取 Channel 的 Unsafe 实例,然后调用 register() 方法注册 Channel。
    // 这里调用的是 unsafe() 方法,表示使用一种不安全的方式直接注册 Channel,而不经过 EventLoop 的事件循环。
    // register() 方法将 Channel 注册到当前 EventLoop,由 EventLoop 负责管理该 Channel。
    promise.channel().unsafe().register(this, promise);
    // 返回传入的 promise 对象,即注册 Channel 的异步结果。
    return promise;
}

AbstractUnsafe#register

代码语言:javascript
复制
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 检查 eventLoop 参数是否为 null,如果为 null,则抛出 NullPointerException 异常。
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    // 检查 Channel 是否已经注册到某个 EventLoop,如果已经注册,则设置 promise 的失败状态,并返回。
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    // 检查传入的 eventLoop 是否与当前 Channel 兼容,如果不兼容,则设置 promise 的失败状态,并返回。
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    // 将当前 Channel 的 eventLoop 属性设置为传入的 eventLoop。
    AbstractChannel.this.eventLoop = eventLoop;

    // 判断当前线程是否在 eventLoop 的事件循环中,如果是,则直接调用 register0() 方法进行注册。
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        // 如果当前线程不在 eventLoop 的事件循环中,则通过 eventLoop.execute() 方法提交一个任务,让 eventLoop 执行注册操作。
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            // 如果无法将注册任务提交给 eventLoop 执行,则强制关闭 Channel,并设置 promise 的失败状态。
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
代码语言:javascript
复制
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 检查 eventLoop 参数是否为 null,如果为 null,则抛出 NullPointerException 异常。
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    // 检查 Channel 是否已经注册到某个 EventLoop,如果已经注册,则设置 promise 的失败状态,并返回。
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    // 检查传入的 eventLoop 是否与当前 Channel 兼容,如果不兼容,则设置 promise 的失败状态,并返回。
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    // 将当前 Channel 的 eventLoop 属性设置为传入的 eventLoop。
    AbstractChannel.this.eventLoop = eventLoop;

    // 判断当前线程是否在 eventLoop 的事件循环中,如果是,则直接调用 register0() 方法进行注册。
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        // 如果当前线程不在 eventLoop 的事件循环中,则通过 eventLoop.execute() 方法提交一个任务,让 eventLoop 执行注册操作。
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            // 如果无法将注册任务提交给 eventLoop 执行,则强制关闭 Channel,并设置 promise 的失败状态。
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

进入到异步这里

代码语言:javascript
复制
 try {
       eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    } catch (Throwable t) {
       ....
    }

eventLoop.execute

代码语言:javascript
复制
@Override
public void execute(Runnable task) {
    // 检查任务是否为null
    if (task == null) {
        throw new NullPointerException("task");
    }

    // 判断当前线程是否在EventLoop的事件循环中
    boolean inEventLoop = inEventLoop();
    
    // 将任务添加到任务队列中
    addTask(task);
    
    // 如果当前线程不在EventLoop的事件循环中
    if (!inEventLoop) {
        // 启动一个新的线程来执行任务
        startThread();
        
        // 如果EventLoop已经被关闭
        if (isShutdown()) {
            boolean reject = false;
            try {
                // 检查任务是否可以从任务队列中移除
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // 任务队列不支持移除操作,直接忽略异常,希望在任务完全终止之前能够拾取到任务
                // 最坏的情况下,在终止时进行记录
            }
            
            // 如果需要拒绝执行该任务
            if (reject) {
                reject();
            }
        }
    }

    // 如果不需要唤醒EventLoop来处理任务
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        // 如果当前线程不在EventLoop的事件循环中,则唤醒EventLoop来处理任务
        wakeup(inEventLoop);
    }
}

addTask(task);

代码语言:javascript
复制
/**
 * 将任务添加到任务队列中,如果实例在关闭之前被关闭,则抛出{@link RejectedExecutionException}。
 */
protected void addTask(Runnable task) {
    // 检查任务是否为null
    if (task == null) {
        throw new NullPointerException("task");
    }
    // 如果无法将任务添加到队列中,则拒绝执行该任务
    if (!offerTask(task)) {
        reject(task);
    }
}

startThread();

代码语言:javascript
复制
private void startThread() {
    // 如果线程状态为未启动
    if (state == ST_NOT_STARTED) {
        // 尝试将状态从未启动更改为已启动
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            try {
                // 启动线程
                doStartThread();
            } catch (Throwable cause) {
                // 如果启动线程时发生异常,则将状态重置为未启动,并抛出异常
                STATE_UPDATER.set(this, ST_NOT_STARTED);
                PlatformDependent.throwException(cause);
            }
        }
    }
}

doStartThread()

代码语言:javascript
复制
private void doStartThread() {
    // 确保线程为空
    assert thread == null;
    // 在执行器上执行一个新的 Runnable
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 将当前线程设置为执行线程
            thread = Thread.currentThread();
            // 如果标记为已中断,则中断线程
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            // 更新上次执行时间
            updateLastExecutionTime();
            try {
                // 运行单线程事件执行器的主要逻辑
                SingleThreadEventExecutor.this.run();
                // 执行成功
                success = true;
            } catch (Throwable t) {
                // 捕获并记录执行期间的异常
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // 循环直到能够安全关闭
                for (;;) {
                    int oldState = state;
                    // 尝试将状态更改为正在关闭
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }

                // 检查是否在循环结束时调用了 confirmShutdown()
                if (success && gracefulShutdownStartTime == 0) {
                    if (logger.isErrorEnabled()) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                "be called before run() implementation terminates.");
                    }
                }

                try {
                    // 运行所有剩余任务和关闭钩子
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        // 清理资源
                        cleanup();
                    } finally {
                        // 移除线程上的所有 FastThreadLocals,因为线程即将终止,并通知未来。
                        // 用户可能会在未来上阻塞,一旦解除阻塞,JVM 可能会终止并开始卸载类。
                        // 详情请参阅 https://github.com/netty/netty/issues/6596。
                        FastThreadLocal.removeAll();

                        // 设置执行器状态为终止
                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        // 释放线程锁
                        threadLock.release();
                        // 如果任务队列不为空,则记录警告日志
                        if (!taskQueue.isEmpty()) {
                            if (logger.isWarnEnabled()) {
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + taskQueue.size() + ')');
                            }
                        }
                        // 设置终止未来为成功
                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
}

SingleThreadEventExecutor.this.run()

代码语言:javascript
复制
@Override
protected void run() {
	// for死循环
    for (;;) {
        try {
            try {
                // 根据选择策略计算下一步的操作
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        // 继续循环,不进行任何操作
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // 忙等待策略,由于 NIO 不支持忙等待,因此执行 select 操作
                        // 不会直接执行 select 方法,而是会先设置 wakenUp 为 false,然后执行 select 方法
                        // 如果 wakenUp 为 true,则再次唤醒 Selector
                        // 这样做是为了减少唤醒 Selector 的开销
                        // 但是存在一个竞态条件:如果 wakenUp 在 select 之前设置为 true,则会导致不必要的唤醒
                        // 因此在 select 之后需要再次判断 wakenUp 是否为 true,如果是则再次唤醒 Selector
                        select(wakenUp.getAndSet(false));

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // 继续执行后续操作
                    default:
                }
            } catch (IOException e) {
                // 如果在这里收到 IOException,则表示 Selector 出现问题,需要重建 Selector 并重试
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                // 如果 ioRatio 为 100%,则优先处理所有的 IO 事件,然后再执行所有任务
                try {
                    processSelectedKeys();
                } finally {
                    // 确保始终运行所有任务
                    runAllTasks();
                }
            } else {
                // 如果 ioRatio 不为 100%,则按比例处理 IO 事件和任务
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // 确保始终运行所有任务
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            // 处理循环中的异常
            handleLoopException(t);
        }
        // 即使循环处理中抛出异常,也始终处理关闭
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            // 处理关闭过程中的异常
            handleLoopException(t);
        }
    }
}

select(wakenUp.getAndSet(false))

代码语言:javascript
复制
private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                // 如果超时时间小于等于 0,则立即进行非阻塞的 selectNow 操作
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // 如果有任务并且 wakenUp 的值为 true,则立即进行非阻塞的 selectNow 操作
            // 这是为了确保任务在 select 操作之前已经被执行
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            // 执行阻塞的 select 操作,并记录选择的次数
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt++;

            // 如果 select 操作返回了结果,或者已经被唤醒,或者有任务待执行,则立即退出循环
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }

            // 如果当前线程被中断,则重置选择的键并退出循环
            if (Thread.interrupted()) {
                selectCnt = 1;
                break;
            }

            // 更新当前时间,并判断是否需要继续循环
            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // 如果 select 次数超过阈值,则重建 Selector
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        // 如果 select 次数超过了预设的阈值,则输出警告日志
        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                    selectCnt - 1, selector);
        }
    } catch (CancelledKeyException e) {
        // 取消键异常通常是无害的,只输出调试日志即可
        logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                selector, e);
    }
}

int selectedKeys = selector.select(timeoutMillis);

熟悉的NIO代码。


processSelectedKeys()

代码语言:javascript
复制
private void processSelectedKeys() {
   // 如果使用了优化的 selectedKeys 集合,则调用优化过的处理方法
   if (selectedKeys != null) {
       processSelectedKeysOptimized();
   } else {
       // 否则,调用普通的处理方法并传入 selector.selectedKeys() 作为参数
       processSelectedKeysPlain(selector.selectedKeys());
   }
}
代码语言:javascript
复制
private void processSelectedKeysOptimized() {
    // 遍历优化过的 selectedKeys 集合
    for (int i = 0; i < selectedKeys.size; ++i) {
        // 获取当前索引处的 SelectionKey
        final SelectionKey k = selectedKeys.keys[i];
        // 将数组中的该元素置为 null,以便在 Channel 关闭后可以被垃圾回收
        selectedKeys.keys[i] = null;

        // 获取 SelectionKey 对应的附件
        final Object a = k.attachment();

        // 如果附件是 AbstractNioChannel 类型,则调用 processSelectedKey 方法处理
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // 否则,认为附件是 NioTask 类型,将其转换为 NioTask<SelectableChannel> 并调用 processSelectedKey 方法处理
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        // 如果需要再次进行 select,则重置 selectedKeys 集合并再次进行 select
        if (needsToSelectAgain) {
            // 将数组中的剩余元素置为 null,以便在 Channel 关闭后可以被垃圾回收
            selectedKeys.reset(i + 1);

            // 再次进行 select 操作
            selectAgain();
            // 重置索引为 -1,使循环从 0 开始
            i = -1;
        }
    }
}
代码语言:javascript
复制
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    // 获取与 Channel 相关联的 NioUnsafe 对象
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // 如果 SelectionKey 不再有效,则关闭 Channel
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // 如果 Channel 实现抛出异常,表示没有事件循环,我们忽略此异常
            // 因为我们只想确定 ch 是否注册到此事件循环,因此具有关闭 ch 的权限
            return;
        }
        // 只有在 ch 仍然注册到此 EventLoop 时才关闭 ch
        // 如果 ch 已从事件循环中注销,则 SelectionKey 可能作为注销过程的一部分被取消注册
        // 但是通道仍然健康且不应关闭。
        // 详见 https://github.com/netty/netty/issues/5125
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        // 关闭 Channel
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // 如果 OP_CONNECT 位被设置,则调用 finishConnect() 完成连接
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // 移除 OP_CONNECT 位,否则 Selector.select(..) 将总是立即返回而不阻塞
            // 详见 https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            // 完成连接
            unsafe.finishConnect();
        }

        // 先处理 OP_WRITE,因为我们可能能够写入一些排队的缓冲区,从而释放内存
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // 调用 forceFlush() 方法,该方法还会负责在没有剩余可写内容时清除 OP_WRITE 位
            ch.unsafe().forceFlush();
        }

        // 如果 OP_READ 或 OP_ACCEPT 被设置,或者 readyOps 为 0,则进行读取操作
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        // 如果出现 CancelledKeyException 异常,则关闭 Channel
        unsafe.close(unsafe.voidPromise());
    }
}

runAllTasks()

代码语言:javascript
复制
/**
 * 从任务队列中获取并运行所有任务。
 * 
 * @return 如果至少运行了一个任务,则返回 true
 */
protected boolean runAllTasks() {
    // 断言当前线程在事件循环中
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        // 从计划任务队列中获取任务
        fetchedAll = fetchFromScheduledTaskQueue();
        // 运行任务队列中的所有任务
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // 继续处理,直到获取所有计划任务。

    // 如果至少运行了一个任务,则更新最后执行时间
    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    // 在运行完所有任务后执行的操作
    afterRunningAllTasks();
    return ranAtLeastOne;
}
代码语言:javascript
复制
/**
 * 从传入的任务队列中运行所有任务。
 *
 * @param taskQueue 要轮询和执行所有任务的任务队列。
 *
 * @return 如果至少执行了一个任务,则返回 true。
 */
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    // 从任务队列中轮询出一个任务
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        // 执行任务
        safeExecute(task);
        // 继续轮询下一个任务
        task = pollTaskFrom(taskQueue);
        if (task == null) {
            return true;
        }
    }
}
代码语言:javascript
复制
/**
 * 从任务队列中轮询出一个任务。
 *
 * @param taskQueue 要轮询的任务队列。
 *
 * @return 任务队列中的下一个任务,如果没有任务则返回 null。
 */
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
    for (;;) {
        // 从任务队列中轮询出一个任务
        Runnable task = taskQueue.poll();
        // 如果轮询出来的任务是 WAKEUP_TASK,则继续轮询下一个任务
        if (task == WAKEUP_TASK) {
            continue;
        }
        return task;
    }
}

取出来的任务就是


执行队列中的任务 AbstractUnsafe#register0

代码语言:javascript
复制
private void register0(ChannelPromise promise) {
    try {
        // 检查通道是否仍然打开,因为在注册调用之外的时间内,通道可能已关闭
        // 设置 ChannelPromise 为不可取消状态,以确保注册成功后无法取消
        // 并且检查通道是否仍然打开
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        // 判断是否是第一次注册
        boolean firstRegistration = neverRegistered;
        // 执行具体的注册逻辑
        doRegister();
        // 标记通道已经注册过
        neverRegistered = false;
        registered = true;

        // 在实际通知 promise 之前,确保先调用 handlerAdded(...) 方法。这是必要的,因为用户可能已经通过管道触发了事件。
        pipeline.invokeHandlerAddedIfNeeded();

        // 设置注册成功,并通知 ChannelPromise
        safeSetSuccess(promise);
        // 触发 ChannelRegistered 事件,通知管道上下文
        pipeline.fireChannelRegistered();
        // 只有在通道尚未激活并且已经注册过一次时才触发 channelActive 事件,避免重复触发
        if (isActive()) {
            if (firstRegistration) {
                // 如果是第一次注册,则触发 channelActive 事件
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // 如果通道已经注册过且设置了自动读取,则重新开始读取,以便处理传入数据
                beginRead();
            }
        }
    } catch (Throwable t) {
        // 发生异常时,直接关闭通道以避免 FD 泄漏
        closeForcibly();
        closeFuture.setClosed();
        // 设置注册失败,并通知 ChannelPromise
        safeSetFailure(promise, t);
    }
}

注册 doRegister()

doRegister();

代码语言:javascript
复制
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 尝试将通道注册到 EventLoop 的 Selector 上,关注的事件为 0,表示不关注任何事件
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // 强制 Selector 立即执行 select 操作,因为 "canceled" SelectionKey 可能仍然被缓存,尚未移除,因为尚未调用 Select.select(..) 操作。
                eventLoop().selectNow();
                selected = true;
            } else {
                // 我们在之前已经强制执行了一次选择操作,但是 SelectionKey 仍然因为某种原因被缓存了,可能是 JDK 的 bug
                throw e;
            }
        }
    }
}
代码语言:javascript
复制
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 尝试将通道注册到 EventLoop 的 Selector 上,关注的事件为 0,表示不关注任何事件
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // 强制 Selector 立即执行 select 操作,因为 "canceled" SelectionKey 可能仍然被缓存,尚未移除,因为尚未调用 Select.select(..) 操作。
                eventLoop().selectNow();
                selected = true;
            } else {
                // 我们在之前已经强制执行了一次选择操作,但是 SelectionKey 仍然因为某种原因被缓存了,可能是 JDK 的 bug
                throw e;
            }
        }
    }
}

熟悉的NIO代码

代码语言:javascript
复制
javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

pipeline.invokeHandlerAddedIfNeeded();

代码语言:javascript
复制
final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false;
        // 已经注册到 EventLoop。现在是时候调用在注册完成之前添加的 ChannelHandlers 的回调方法了。
        callHandlerAddedForAllHandlers();
    }
}
代码语言:javascript
复制
private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;

        // 当前 Channel 已经注册。
        registered = true;

        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        // 置空以便垃圾回收。
        this.pendingHandlerCallbackHead = null;
    }

    // 必须在 synchronized(...) 块之外执行,否则 handlerAdded(...) 可能在持有锁的情况下被调用,
    // 如果 handlerAdded(...) 尝试从 EventLoop 外部添加另一个处理程序,则可能产生死锁。
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        task.execute();
        task = task.next;
    }
}
代码语言:javascript
复制
@Override
void execute() {
    EventExecutor executor = ctx.executor();
    if (executor.inEventLoop()) {
        // 如果当前线程在事件循环中,则直接调用 handlerAdded0(ctx)。
        callHandlerAdded0(ctx);
    } else {
        try {
            // 否则,提交任务到事件执行器中。
            executor.execute(this);
        } catch (RejectedExecutionException e) {
            // 如果任务被拒绝,则从处理程序链中移除该处理程序。
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
                        executor, ctx.name(), e);
            }
            remove0(ctx);
            ctx.setRemoved();
        }
    }
}
代码语言:javascript
复制
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try {
        // 调用处理程序的 handlerAdded 方法。
        ctx.callHandlerAdded();
    } catch (Throwable t) {
        boolean removed = false;
        try {
            // 如果处理程序添加失败,尝试将其从管道中移除。
            remove0(ctx);
            ctx.callHandlerRemoved();
            removed = true;
        } catch (Throwable t2) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to remove a handler: " + ctx.name(), t2);
            }
        }

        // 如果处理程序添加失败且已成功移除,则触发异常捕获事件,报告移除异常。
        // 否则,触发异常捕获事件,报告添加异常和移除异常。
        if (removed) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() +
                    ".handlerAdded() has thrown an exception; removed.", t));
        } else {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() +
                    ".handlerAdded() has thrown an exception; also failed to remove.", t));
        }
    }
}
代码语言:javascript
复制
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try {
        // 调用处理程序的 handlerAdded 方法。
        ctx.callHandlerAdded();
    } catch (Throwable t) {
        boolean removed = false;
        try {
            // 如果处理程序添加失败,尝试将其从管道中移除。
            remove0(ctx);
            ctx.callHandlerRemoved();
            removed = true;
        } catch (Throwable t2) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to remove a handler: " + ctx.name(), t2);
            }
        }

        // 如果处理程序添加失败且已成功移除,则触发异常捕获事件,报告移除异常。
        // 否则,触发异常捕获事件,报告添加异常和移除异常。
        if (removed) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() +
                    ".handlerAdded() has thrown an exception; removed.", t));
        } else {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() +
                    ".handlerAdded() has thrown an exception; also failed to remove.", t));
        }
    }
}
代码语言:javascript
复制
/**
 * {@inheritDoc} 如果重写此方法,请确保调用 super!
 */
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        // 这应该总是为 true,根据我们当前的 DefaultChannelPipeline 实现。
        // 在 handlerAdded(...) 中调用 initChannel(...) 的好处是,如果 ChannelInitializer 添加了另一个 ChannelInitializer,
        // 不会出现排序意外。这是因为所有处理程序都将按预期的顺序添加。
        if (initChannel(ctx)) {

            // 完成 Channel 的初始化,现在移除初始化程序。
            removeState(ctx);
        }
    }
}
代码语言:javascript
复制
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) { // 防止重入。
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            // 在调用 initChannel(...) 之前显式调用 exceptionCaught(...)。
            // 我们这样做是为了防止多次调用 initChannel(...)。
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}
代码语言:javascript
复制
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) { // 防止重入。
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            // 在调用 initChannel(...) 之前显式调用 exceptionCaught(...)。
            // 我们这样做是为了防止多次调用 initChannel(...)。
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}
代码语言:javascript
复制
p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) throws Exception {
        // 获取管道
        final ChannelPipeline pipeline = ch.pipeline();
        // 获取配置的处理程序
        ChannelHandler handler = config.handler();
        // 如果存在处理程序,则将其添加到管道中
        if (handler != null) {
            pipeline.addLast(handler);
        }

        // 通过通道的事件循环执行一个新的Runnable任务
        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                // 添加一个新的 ServerBootstrapAcceptor 到管道中
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});
代码语言:javascript
复制
p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) throws Exception {
        // 获取管道
        final ChannelPipeline pipeline = ch.pipeline();
        // 获取配置的处理程序
        ChannelHandler handler = config.handler();
        // 如果存在处理程序,则将其添加到管道中
        if (handler != null) {
            pipeline.addLast(handler);
        }

        // 通过通道的事件循环执行一个新的Runnable任务
        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                // 添加一个新的 ServerBootstrapAcceptor 到管道中
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});

pipeline.fireChannelRegistered();

代码语言:javascript
复制
@Override
public final ChannelPipeline fireChannelRegistered() {
   // 通过头部上下文触发通道注册事件
   AbstractChannelHandlerContext.invokeChannelRegistered(head);
   return this;
}
代码语言:javascript
复制
/**
 * 通过给定的下一个上下文触发通道注册事件,如果在事件循环中,直接调用,否则提交到事件执行器中执行。
 *
 * @param next 要触发通道注册事件的下一个上下文
 */
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // 如果在事件循环中,直接调用通道注册事件
        next.invokeChannelRegistered();
    } else {
        // 否则,提交到事件执行器中执行
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}
代码语言:javascript
复制
/**
 * 调用通道注册事件处理程序。
 * 如果处理程序被调用成功,则调用通道注册方法;否则,继续触发下一个上下文的通道注册事件。
 */
private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            // 如果处理程序被调用成功,则调用通道注册方法
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            // 处理程序调用时发生异常,通知异常处理程序
            notifyHandlerException(t);
        }
    } else {
        // 处理程序未被调用成功,继续触发下一个上下文的通道注册事件
        fireChannelRegistered();
    }
}

pipeline.fireChannelActive();

代码语言:javascript
复制
if (isActive()) {
    if (firstRegistration) {
        // 如果是首次注册,则触发通道激活事件
        pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
        // 如果通道已经注册过,并且设置了自动读取,则重新开始读取以处理传入数据
        beginRead();
    }
}
代码语言:javascript
复制
@Override
public final ChannelPipeline fireChannelActive() {
    // 调用头部上下文的 invokeChannelActive 方法来触发 ChannelActive 事件
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}
代码语言:javascript
复制
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    // 如果当前线程是事件循环线程,则直接调用 invokeChannelActive 方法
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        // 否则,通过事件执行器异步执行 invokeChannelActive 方法
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
}

源码流程图

图都给你画好了,戳这里

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-02-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Pre
  • Netty主从Reactor线程模型
  • 服务端channel注册流程
  • 源码解读
    • 入口 serverBootstrap.bind(port)
      • 执行队列中的任务 AbstractUnsafe#register0
        • 注册 doRegister()
        • 源码流程图
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档