Netty Review - ServerBootstrap源码解析
Netty Review - NioServerSocketChannel源码分析
Netty 使用主从 Reactor 线程模型来处理并发连接和网络事件。
在 Netty 中,通常有两种类型的线程池:
主从 Reactor 线程模型的工作流程如下:
通过主从 Reactor 线程模型,Netty 可以高效地处理大量的并发连接和网络事件,提高了网络应用程序的性能和可扩展性。
在Netty中,服务端Channel注册流程涉及以下几个关键步骤:
通过以上流程,服务端Channel在Netty中的注册过程就完成了,它可以接受客户端的连接,并将连接注册到EventLoop上进行事件处理。
当我们梳理完
Netty Review - ServerBootstrap源码解析
Netty Review - NioServerSocketChannel源码分析
接下来让我们从下面这一行代码开始
ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
channelFuture.channel().closeFuture().sync();
这段代码用于启动服务端并阻塞当前线程直到服务端关闭。
serverBootstrap.bind(9000)
:调用serverBootstrap
的bind()
方法绑定端口9000,并返回一个ChannelFuture
对象,表示绑定操作的异步结果。
.sync()
:调用sync()
方法阻塞当前线程,直到绑定操作完成。这样做是为了确保服务端在端口绑定完成后再继续执行后续代码。
channelFuture.channel().closeFuture().sync()
:获取channelFuture
中的channel()
,然后调用其closeFuture()
方法获取一个表示关闭操作的ChannelFuture
对象。接着,再次调用sync()
方法阻塞当前线程,直到关闭操作完成。这样做是为了让当前线程一直等待直到服务端关闭。
这段代码是bind(int inetPort)
方法的实现。
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(int inetPort) {
// 调用bind方法,传入一个InetSocketAddress对象,该对象使用指定的端口号创建
return bind(new InetSocketAddress(inetPort));
}
创建一个新的Channel并绑定到指定的端口。
doBind(final SocketAddress localAddress)
这段代码是doBind(final SocketAddress localAddress)
方法的实现。
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并注册Channel,并返回一个ChannelFuture
final ChannelFuture regFuture = initAndRegister();
// 获取注册完成的Channel
final Channel channel = regFuture.channel();
// 如果注册过程中发生异常,则直接返回注册的ChannelFuture
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// 如果注册已经完成,则创建一个新的ChannelPromise,并执行绑定操作
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 如果注册尚未完成,则创建一个PendingRegistrationPromise,并添加一个监听器等待注册完成后再执行绑定操作
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// 如果注册过程中发生异常,则直接设置失败状态
promise.setFailure(cause);
} else {
// 注册成功后执行绑定操作
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
这段代码的作用是执行绑定操作,并返回一个与绑定操作相关的ChannelFuture。
initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 使用channelFactory创建一个新的Channel实例
channel = channelFactory.newChannel();
// 对新创建的Channel进行初始化
init(channel);
} catch (Throwable t) {
if (channel != null) {
// 如果初始化过程中发生异常,关闭Channel
channel.unsafe().closeForcibly();
// 创建一个新的ChannelPromise,并设置失败状态
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 如果channel为null,则创建一个FailedChannel实例,并设置失败状态
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 使用ChannelConfig的EventLoopGroup进行注册
ChannelFuture regFuture = config().group().register(channel);
// 如果注册过程中发生异常,则关闭Channel
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// 返回注册的ChannelFuture
return regFuture;
}
创建一个新的Channel实例并对其进行初始化,然后使用EventLoopGroup将其注册到事件循环中。最后返回一个与注册操作相关的ChannelFuture。
channelFactory.newChannel()
channelFactory.newChannel() 中的实现,请移步 Netty Review - NioServerSocketChannel源码分析
init(channel)
@Override
void init(Channel channel) throws Exception {
// 设置Channel的选项
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 设置Channel的属性
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 获取Channel的Pipeline
ChannelPipeline p = channel.pipeline();
// 复制当前的子组、子处理器、子选项和子属性
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
// 向Channel的Pipeline中添加一个ChannelInitializer
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
// 添加用户配置的处理器到Pipeline中
pipeline.addLast(handler);
}
// 在Channel的事件循环中执行
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加一个ServerBootstrapAcceptor到Pipeline中,用于接收新连接
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
init()
方法的作用是初始化Channel
,设置Channel
的选项和属性,然后向Channel
的Pipeline
中添加一个ChannelInitializer
,该Initializer
在Channel
的事件循环中执行,并向Pipeline
中添加一个ServerBootstrapAcceptor
,用于接收新连接。
config().group().register(channel)
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
next()
io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser#next
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
// 使用原子整数来维护索引,以确保在多线程环境中安全地获取下一个 EventExecutor 实例。
private final AtomicInteger idx = new AtomicInteger();
// 存储所有可用的 EventExecutor 实例的数组。
private final EventExecutor[] executors;
// 构造方法,接收一个 EventExecutor 实例数组作为参数,并将其存储在 executors 成员变量中。
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
// 选择下一个要使用的 EventExecutor 实例。
// 通过对索引进行按位与操作(idx.getAndIncrement() & executors.length - 1),
// 来确保索引始终在 executors 数组的有效范围内。
// 由于 executors.length 必须是 2 的幂次方,因此使用按位与运算(&)可以有效地实现取模操作,
// 从而将索引限制在数组长度范围内。
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
// 检查传入的 promise 参数是否为 null,如果为 null,则抛出 NullPointerException 异常。
ObjectUtil.checkNotNull(promise, "promise");
// 获取与该 promise 关联的 Channel 实例,通过 unsafe() 方法获取 Channel 的 Unsafe 实例,然后调用 register() 方法注册 Channel。
// 这里调用的是 unsafe() 方法,表示使用一种不安全的方式直接注册 Channel,而不经过 EventLoop 的事件循环。
// register() 方法将 Channel 注册到当前 EventLoop,由 EventLoop 负责管理该 Channel。
promise.channel().unsafe().register(this, promise);
// 返回传入的 promise 对象,即注册 Channel 的异步结果。
return promise;
}
AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 检查 eventLoop 参数是否为 null,如果为 null,则抛出 NullPointerException 异常。
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
// 检查 Channel 是否已经注册到某个 EventLoop,如果已经注册,则设置 promise 的失败状态,并返回。
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
// 检查传入的 eventLoop 是否与当前 Channel 兼容,如果不兼容,则设置 promise 的失败状态,并返回。
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 将当前 Channel 的 eventLoop 属性设置为传入的 eventLoop。
AbstractChannel.this.eventLoop = eventLoop;
// 判断当前线程是否在 eventLoop 的事件循环中,如果是,则直接调用 register0() 方法进行注册。
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
// 如果当前线程不在 eventLoop 的事件循环中,则通过 eventLoop.execute() 方法提交一个任务,让 eventLoop 执行注册操作。
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 如果无法将注册任务提交给 eventLoop 执行,则强制关闭 Channel,并设置 promise 的失败状态。
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 检查 eventLoop 参数是否为 null,如果为 null,则抛出 NullPointerException 异常。
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
// 检查 Channel 是否已经注册到某个 EventLoop,如果已经注册,则设置 promise 的失败状态,并返回。
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
// 检查传入的 eventLoop 是否与当前 Channel 兼容,如果不兼容,则设置 promise 的失败状态,并返回。
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 将当前 Channel 的 eventLoop 属性设置为传入的 eventLoop。
AbstractChannel.this.eventLoop = eventLoop;
// 判断当前线程是否在 eventLoop 的事件循环中,如果是,则直接调用 register0() 方法进行注册。
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
// 如果当前线程不在 eventLoop 的事件循环中,则通过 eventLoop.execute() 方法提交一个任务,让 eventLoop 执行注册操作。
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 如果无法将注册任务提交给 eventLoop 执行,则强制关闭 Channel,并设置 promise 的失败状态。
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
进入到异步这里
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
....
}
eventLoop.execute
@Override
public void execute(Runnable task) {
// 检查任务是否为null
if (task == null) {
throw new NullPointerException("task");
}
// 判断当前线程是否在EventLoop的事件循环中
boolean inEventLoop = inEventLoop();
// 将任务添加到任务队列中
addTask(task);
// 如果当前线程不在EventLoop的事件循环中
if (!inEventLoop) {
// 启动一个新的线程来执行任务
startThread();
// 如果EventLoop已经被关闭
if (isShutdown()) {
boolean reject = false;
try {
// 检查任务是否可以从任务队列中移除
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// 任务队列不支持移除操作,直接忽略异常,希望在任务完全终止之前能够拾取到任务
// 最坏的情况下,在终止时进行记录
}
// 如果需要拒绝执行该任务
if (reject) {
reject();
}
}
}
// 如果不需要唤醒EventLoop来处理任务
if (!addTaskWakesUp && wakesUpForTask(task)) {
// 如果当前线程不在EventLoop的事件循环中,则唤醒EventLoop来处理任务
wakeup(inEventLoop);
}
}
addTask(task);
/**
* 将任务添加到任务队列中,如果实例在关闭之前被关闭,则抛出{@link RejectedExecutionException}。
*/
protected void addTask(Runnable task) {
// 检查任务是否为null
if (task == null) {
throw new NullPointerException("task");
}
// 如果无法将任务添加到队列中,则拒绝执行该任务
if (!offerTask(task)) {
reject(task);
}
}
startThread();
private void startThread() {
// 如果线程状态为未启动
if (state == ST_NOT_STARTED) {
// 尝试将状态从未启动更改为已启动
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
// 启动线程
doStartThread();
} catch (Throwable cause) {
// 如果启动线程时发生异常,则将状态重置为未启动,并抛出异常
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
doStartThread()
private void doStartThread() {
// 确保线程为空
assert thread == null;
// 在执行器上执行一个新的 Runnable
executor.execute(new Runnable() {
@Override
public void run() {
// 将当前线程设置为执行线程
thread = Thread.currentThread();
// 如果标记为已中断,则中断线程
if (interrupted) {
thread.interrupt();
}
boolean success = false;
// 更新上次执行时间
updateLastExecutionTime();
try {
// 运行单线程事件执行器的主要逻辑
SingleThreadEventExecutor.this.run();
// 执行成功
success = true;
} catch (Throwable t) {
// 捕获并记录执行期间的异常
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 循环直到能够安全关闭
for (;;) {
int oldState = state;
// 尝试将状态更改为正在关闭
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// 检查是否在循环结束时调用了 confirmShutdown()
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// 运行所有剩余任务和关闭钩子
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
// 清理资源
cleanup();
} finally {
// 移除线程上的所有 FastThreadLocals,因为线程即将终止,并通知未来。
// 用户可能会在未来上阻塞,一旦解除阻塞,JVM 可能会终止并开始卸载类。
// 详情请参阅 https://github.com/netty/netty/issues/6596。
FastThreadLocal.removeAll();
// 设置执行器状态为终止
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
// 释放线程锁
threadLock.release();
// 如果任务队列不为空,则记录警告日志
if (!taskQueue.isEmpty()) {
if (logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
}
// 设置终止未来为成功
terminationFuture.setSuccess(null);
}
}
}
}
});
}
SingleThreadEventExecutor.this.run()
@Override
protected void run() {
// for死循环
for (;;) {
try {
try {
// 根据选择策略计算下一步的操作
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
// 继续循环,不进行任何操作
continue;
case SelectStrategy.BUSY_WAIT:
// 忙等待策略,由于 NIO 不支持忙等待,因此执行 select 操作
// 不会直接执行 select 方法,而是会先设置 wakenUp 为 false,然后执行 select 方法
// 如果 wakenUp 为 true,则再次唤醒 Selector
// 这样做是为了减少唤醒 Selector 的开销
// 但是存在一个竞态条件:如果 wakenUp 在 select 之前设置为 true,则会导致不必要的唤醒
// 因此在 select 之后需要再次判断 wakenUp 是否为 true,如果是则再次唤醒 Selector
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// 继续执行后续操作
default:
}
} catch (IOException e) {
// 如果在这里收到 IOException,则表示 Selector 出现问题,需要重建 Selector 并重试
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
// 如果 ioRatio 为 100%,则优先处理所有的 IO 事件,然后再执行所有任务
try {
processSelectedKeys();
} finally {
// 确保始终运行所有任务
runAllTasks();
}
} else {
// 如果 ioRatio 不为 100%,则按比例处理 IO 事件和任务
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 确保始终运行所有任务
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
// 处理循环中的异常
handleLoopException(t);
}
// 即使循环处理中抛出异常,也始终处理关闭
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
// 处理关闭过程中的异常
handleLoopException(t);
}
}
}
select(wakenUp.getAndSet(false))
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
// 如果超时时间小于等于 0,则立即进行非阻塞的 selectNow 操作
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 如果有任务并且 wakenUp 的值为 true,则立即进行非阻塞的 selectNow 操作
// 这是为了确保任务在 select 操作之前已经被执行
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 执行阻塞的 select 操作,并记录选择的次数
int selectedKeys = selector.select(timeoutMillis);
selectCnt++;
// 如果 select 操作返回了结果,或者已经被唤醒,或者有任务待执行,则立即退出循环
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
// 如果当前线程被中断,则重置选择的键并退出循环
if (Thread.interrupted()) {
selectCnt = 1;
break;
}
// 更新当前时间,并判断是否需要继续循环
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 如果 select 次数超过阈值,则重建 Selector
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
// 如果 select 次数超过了预设的阈值,则输出警告日志
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
} catch (CancelledKeyException e) {
// 取消键异常通常是无害的,只输出调试日志即可
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
}
int selectedKeys = selector.select(timeoutMillis);
熟悉的NIO代码。
processSelectedKeys()
private void processSelectedKeys() {
// 如果使用了优化的 selectedKeys 集合,则调用优化过的处理方法
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
// 否则,调用普通的处理方法并传入 selector.selectedKeys() 作为参数
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
// 遍历优化过的 selectedKeys 集合
for (int i = 0; i < selectedKeys.size; ++i) {
// 获取当前索引处的 SelectionKey
final SelectionKey k = selectedKeys.keys[i];
// 将数组中的该元素置为 null,以便在 Channel 关闭后可以被垃圾回收
selectedKeys.keys[i] = null;
// 获取 SelectionKey 对应的附件
final Object a = k.attachment();
// 如果附件是 AbstractNioChannel 类型,则调用 processSelectedKey 方法处理
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
// 否则,认为附件是 NioTask 类型,将其转换为 NioTask<SelectableChannel> 并调用 processSelectedKey 方法处理
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 如果需要再次进行 select,则重置 selectedKeys 集合并再次进行 select
if (needsToSelectAgain) {
// 将数组中的剩余元素置为 null,以便在 Channel 关闭后可以被垃圾回收
selectedKeys.reset(i + 1);
// 再次进行 select 操作
selectAgain();
// 重置索引为 -1,使循环从 0 开始
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 获取与 Channel 相关联的 NioUnsafe 对象
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 如果 SelectionKey 不再有效,则关闭 Channel
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// 如果 Channel 实现抛出异常,表示没有事件循环,我们忽略此异常
// 因为我们只想确定 ch 是否注册到此事件循环,因此具有关闭 ch 的权限
return;
}
// 只有在 ch 仍然注册到此 EventLoop 时才关闭 ch
// 如果 ch 已从事件循环中注销,则 SelectionKey 可能作为注销过程的一部分被取消注册
// 但是通道仍然健康且不应关闭。
// 详见 https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// 关闭 Channel
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// 如果 OP_CONNECT 位被设置,则调用 finishConnect() 完成连接
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 移除 OP_CONNECT 位,否则 Selector.select(..) 将总是立即返回而不阻塞
// 详见 https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 完成连接
unsafe.finishConnect();
}
// 先处理 OP_WRITE,因为我们可能能够写入一些排队的缓冲区,从而释放内存
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// 调用 forceFlush() 方法,该方法还会负责在没有剩余可写内容时清除 OP_WRITE 位
ch.unsafe().forceFlush();
}
// 如果 OP_READ 或 OP_ACCEPT 被设置,或者 readyOps 为 0,则进行读取操作
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
// 如果出现 CancelledKeyException 异常,则关闭 Channel
unsafe.close(unsafe.voidPromise());
}
}
runAllTasks()
/**
* 从任务队列中获取并运行所有任务。
*
* @return 如果至少运行了一个任务,则返回 true
*/
protected boolean runAllTasks() {
// 断言当前线程在事件循环中
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
// 从计划任务队列中获取任务
fetchedAll = fetchFromScheduledTaskQueue();
// 运行任务队列中的所有任务
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // 继续处理,直到获取所有计划任务。
// 如果至少运行了一个任务,则更新最后执行时间
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
// 在运行完所有任务后执行的操作
afterRunningAllTasks();
return ranAtLeastOne;
}
/**
* 从传入的任务队列中运行所有任务。
*
* @param taskQueue 要轮询和执行所有任务的任务队列。
*
* @return 如果至少执行了一个任务,则返回 true。
*/
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
// 从任务队列中轮询出一个任务
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
// 执行任务
safeExecute(task);
// 继续轮询下一个任务
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}
/**
* 从任务队列中轮询出一个任务。
*
* @param taskQueue 要轮询的任务队列。
*
* @return 任务队列中的下一个任务,如果没有任务则返回 null。
*/
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
// 从任务队列中轮询出一个任务
Runnable task = taskQueue.poll();
// 如果轮询出来的任务是 WAKEUP_TASK,则继续轮询下一个任务
if (task == WAKEUP_TASK) {
continue;
}
return task;
}
}
取出来的任务就是
private void register0(ChannelPromise promise) {
try {
// 检查通道是否仍然打开,因为在注册调用之外的时间内,通道可能已关闭
// 设置 ChannelPromise 为不可取消状态,以确保注册成功后无法取消
// 并且检查通道是否仍然打开
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// 判断是否是第一次注册
boolean firstRegistration = neverRegistered;
// 执行具体的注册逻辑
doRegister();
// 标记通道已经注册过
neverRegistered = false;
registered = true;
// 在实际通知 promise 之前,确保先调用 handlerAdded(...) 方法。这是必要的,因为用户可能已经通过管道触发了事件。
pipeline.invokeHandlerAddedIfNeeded();
// 设置注册成功,并通知 ChannelPromise
safeSetSuccess(promise);
// 触发 ChannelRegistered 事件,通知管道上下文
pipeline.fireChannelRegistered();
// 只有在通道尚未激活并且已经注册过一次时才触发 channelActive 事件,避免重复触发
if (isActive()) {
if (firstRegistration) {
// 如果是第一次注册,则触发 channelActive 事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 如果通道已经注册过且设置了自动读取,则重新开始读取,以便处理传入数据
beginRead();
}
}
} catch (Throwable t) {
// 发生异常时,直接关闭通道以避免 FD 泄漏
closeForcibly();
closeFuture.setClosed();
// 设置注册失败,并通知 ChannelPromise
safeSetFailure(promise, t);
}
}
doRegister();
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 尝试将通道注册到 EventLoop 的 Selector 上,关注的事件为 0,表示不关注任何事件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// 强制 Selector 立即执行 select 操作,因为 "canceled" SelectionKey 可能仍然被缓存,尚未移除,因为尚未调用 Select.select(..) 操作。
eventLoop().selectNow();
selected = true;
} else {
// 我们在之前已经强制执行了一次选择操作,但是 SelectionKey 仍然因为某种原因被缓存了,可能是 JDK 的 bug
throw e;
}
}
}
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 尝试将通道注册到 EventLoop 的 Selector 上,关注的事件为 0,表示不关注任何事件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// 强制 Selector 立即执行 select 操作,因为 "canceled" SelectionKey 可能仍然被缓存,尚未移除,因为尚未调用 Select.select(..) 操作。
eventLoop().selectNow();
selected = true;
} else {
// 我们在之前已经强制执行了一次选择操作,但是 SelectionKey 仍然因为某种原因被缓存了,可能是 JDK 的 bug
throw e;
}
}
}
}
熟悉的NIO代码
javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
pipeline.invokeHandlerAddedIfNeeded();
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// 已经注册到 EventLoop。现在是时候调用在注册完成之前添加的 ChannelHandlers 的回调方法了。
callHandlerAddedForAllHandlers();
}
}
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// 当前 Channel 已经注册。
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// 置空以便垃圾回收。
this.pendingHandlerCallbackHead = null;
}
// 必须在 synchronized(...) 块之外执行,否则 handlerAdded(...) 可能在持有锁的情况下被调用,
// 如果 handlerAdded(...) 尝试从 EventLoop 外部添加另一个处理程序,则可能产生死锁。
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
// 如果当前线程在事件循环中,则直接调用 handlerAdded0(ctx)。
callHandlerAdded0(ctx);
} else {
try {
// 否则,提交任务到事件执行器中。
executor.execute(this);
} catch (RejectedExecutionException e) {
// 如果任务被拒绝,则从处理程序链中移除该处理程序。
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
remove0(ctx);
ctx.setRemoved();
}
}
}
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// 调用处理程序的 handlerAdded 方法。
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
// 如果处理程序添加失败,尝试将其从管道中移除。
remove0(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
// 如果处理程序添加失败且已成功移除,则触发异常捕获事件,报告移除异常。
// 否则,触发异常捕获事件,报告添加异常和移除异常。
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// 调用处理程序的 handlerAdded 方法。
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
// 如果处理程序添加失败,尝试将其从管道中移除。
remove0(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
// 如果处理程序添加失败且已成功移除,则触发异常捕获事件,报告移除异常。
// 否则,触发异常捕获事件,报告添加异常和移除异常。
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
/**
* {@inheritDoc} 如果重写此方法,请确保调用 super!
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// 这应该总是为 true,根据我们当前的 DefaultChannelPipeline 实现。
// 在 handlerAdded(...) 中调用 initChannel(...) 的好处是,如果 ChannelInitializer 添加了另一个 ChannelInitializer,
// 不会出现排序意外。这是因为所有处理程序都将按预期的顺序添加。
if (initChannel(ctx)) {
// 完成 Channel 的初始化,现在移除初始化程序。
removeState(ctx);
}
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // 防止重入。
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// 在调用 initChannel(...) 之前显式调用 exceptionCaught(...)。
// 我们这样做是为了防止多次调用 initChannel(...)。
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // 防止重入。
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// 在调用 initChannel(...) 之前显式调用 exceptionCaught(...)。
// 我们这样做是为了防止多次调用 initChannel(...)。
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
// 获取管道
final ChannelPipeline pipeline = ch.pipeline();
// 获取配置的处理程序
ChannelHandler handler = config.handler();
// 如果存在处理程序,则将其添加到管道中
if (handler != null) {
pipeline.addLast(handler);
}
// 通过通道的事件循环执行一个新的Runnable任务
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加一个新的 ServerBootstrapAcceptor 到管道中
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
// 获取管道
final ChannelPipeline pipeline = ch.pipeline();
// 获取配置的处理程序
ChannelHandler handler = config.handler();
// 如果存在处理程序,则将其添加到管道中
if (handler != null) {
pipeline.addLast(handler);
}
// 通过通道的事件循环执行一个新的Runnable任务
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加一个新的 ServerBootstrapAcceptor 到管道中
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
pipeline.fireChannelRegistered();
@Override
public final ChannelPipeline fireChannelRegistered() {
// 通过头部上下文触发通道注册事件
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
/**
* 通过给定的下一个上下文触发通道注册事件,如果在事件循环中,直接调用,否则提交到事件执行器中执行。
*
* @param next 要触发通道注册事件的下一个上下文
*/
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 如果在事件循环中,直接调用通道注册事件
next.invokeChannelRegistered();
} else {
// 否则,提交到事件执行器中执行
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
/**
* 调用通道注册事件处理程序。
* 如果处理程序被调用成功,则调用通道注册方法;否则,继续触发下一个上下文的通道注册事件。
*/
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
// 如果处理程序被调用成功,则调用通道注册方法
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
// 处理程序调用时发生异常,通知异常处理程序
notifyHandlerException(t);
}
} else {
// 处理程序未被调用成功,继续触发下一个上下文的通道注册事件
fireChannelRegistered();
}
}
pipeline.fireChannelActive();
if (isActive()) {
if (firstRegistration) {
// 如果是首次注册,则触发通道激活事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 如果通道已经注册过,并且设置了自动读取,则重新开始读取以处理传入数据
beginRead();
}
}
@Override
public final ChannelPipeline fireChannelActive() {
// 调用头部上下文的 invokeChannelActive 方法来触发 ChannelActive 事件
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
// 如果当前线程是事件循环线程,则直接调用 invokeChannelActive 方法
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
// 否则,通过事件执行器异步执行 invokeChannelActive 方法
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
图都给你画好了,戳这里