前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程 | 技术创作特训营第一期

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程 | 技术创作特训营第一期

原创
作者头像
sidiot
修改2023-08-22 08:00:47
4.1K10
修改2023-08-22 08:00:47
举报
文章被收录于专栏:技术大杂烩技术大杂烩

介绍

在实际应用中,当客户端尝试连接服务器时,可能会面临多种原因导致连接失败的情况。为了避免无限等待,我们可以在客户端代码中设置一个超时连接时间 CONNECT_TIMEOUT_MILLIS,该时间表示客户端尝试连接服务器的最长时间限制,如果在指定的超时时间内未能成功建立连接,客户端应该主动抛出连接超时的异常。

代码语言:java
复制
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)

上述代码的作用是设置连接超时时间为 1000 毫秒,这个选项用于指定连接建立的最大时间,如果超过该时间仍未建立连接,则会放弃连接尝试。

运行结果:

然而,当服务器没有启动时,且连接超时时间大于 2 秒钟时,则会抛出连接被拒绝的异常,运行结果如下所示:

这是 Java 底层的网络异常。

需要完整代码的读者请访问博主的 Github:TestConnectionTimeout

接下来让我们探索 connect()ChannelFuture.sync() 的执行过程。

connect 源码解析

我们先来探究成功执行连接超时所进行的过程,核心方法 connect() 的部分源码如下所示:

代码语言:java
复制
@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    try {
        ...
        // Schedule connect timeout.
        int connectTimeoutMillis = config().getConnectTimeoutMillis();
        if (connectTimeoutMillis > 0) {
            connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                    ConnectTimeoutException cause =
                            new ConnectTimeoutException("connection timed out: " + remoteAddress);
                    if (connectPromise != null && connectPromise.tryFailure(cause)) {
                        close(voidPromise());
                    }
                }
            }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
        }
        ...
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

上述代码的主要内容是根据配置获取连接超时时间,并使用事件循环调度一个定时任务,在指定的时间内检查连接是否超时。如果连接超时,会触发一个 ConnectTimeoutException 异常,并尝试向 connectPromise 发送连接超时的失败信息;否则,连接超时任务被取消,通道关闭。

那主线程是如何知道消息的呢?其实是通过 connectPromise 进行传递消息,我们可以在主线程中标记一下 future,如下图所示:

然后切换至 NIO 线程,可以发现 connectPromise 也被标记了,说明他们共属于一个主体,如下图所示:

如果不是很了解 FuturePromise 之间的联系的话,可以阅读博主的另一篇文章:异步编程模型:利用 Future 和 Promise 提高性能与响应能力

在上述事例中,我们设置了两秒钟的连接超时时间,由于两秒钟内客户端并没有与服务器建立连接,因此触发了定时任务,执行了 run() 方法,抛出了连接超时异常 ConnectTimeoutException

ChannelFuture.sync() 执行过程解析

下面是 ChannelFuture.sync() 方法的执行过程:

  1. 调用 ChannelFuture.sync() 方法将当前线程阻塞,直到对应的操作完成或发生异常。
  2. sync() 方法内部,会获取当前线程绑定的 EventLoop 对象,然后将当前任务包装成一个特殊的 Promise 对象。
  3. Promise 对象会被注册到 EventLoop 中的任务队列中,等待执行。EventLoop 会按顺序从任务队列中取出任务并执行。
  4. 一旦 Promise 执行完成,即异步操作完成或发生异常,sync() 方法会解除当前线程的阻塞状态,并返回操作的结果或抛出异常。
  5. 操作成功完成,可以通过 ChannelFuture.isSuccess() 方法检查操作是否成功。如果成功,可以继续执行后续的操作;如果失败,可以通过 ChannelFuture.cause() 方法获取失败的原因。

需要注意的是,由于 ChannelFuture.sync() 是一个同步阻塞方法,如果在事件循环线程中调用该方法,可能会导致死锁或性能问题。因此,通常建议在其他线程中使用 ChannelFuture.addListener() 方法注册监听器来处理异步操作的结果,而不是直接使用 sync() 方法。

sync 源码解析

首先使用 super.sync() 调用了父类的 sync() 方法,将当前对象作为结果返回。

代码语言:java
复制
@Override  
public ChannelPromise sync() throws InterruptedException {  
    super.sync();  
    return this;  
}

上述代码的目的是在执行特定的同步操作后,返回当前的 ChannelPromise 对象。在这种情况下,子类通过调用父类的 sync() 方法来实现同步操作,并在执行完成后返回当前对象,以便支持链式调用或其他需要获取该对象的操作。

然后在父类的 sync() 方法中,调用 await()rethrowIfFailed() 来实现同步等待和异常检查,并返回当前对象。

代码语言:java
复制
@Override  
public Promise<V> sync() throws InterruptedException {  
    await();  
    rethrowIfFailed();  
    return this;  
}

在之后的几个方法中,就不对子类做过多的介绍了。

await 源码解析

await 方法是一种等待机制的实现,它通过检查承诺是否已完成,处理中断异常以及使用同步块和等待机制来让线程等待承诺的完成。

代码语言:java
复制
@Override  
public Promise<V> await() throws InterruptedException {  
    if (isDone()) {  
        return this;  
    }  
    // 处理线程中断
    if (Thread.interrupted()) {  
        throw new InterruptedException(toString());  
    }  
    // 检查死锁
    checkDeadLock();  

    synchronized (this) {  
        while (!isDone()) {  
            incWaiters();  
            try {  
                wait();  
            } finally {  
                decWaiters();  
            }  
        }  
    }  
    return this;  
}

在上述代码中,如果 isDone() 方法返回 true,说明该承诺已经完成,直接返回当前对象。

Thread.interrupted() 用于检查当前线程是否被中断,如果是,则抛出 InterruptedException 异常,并将当前对象的字符串表示作为异常消息。

checkDeadLock() 方法用于检查是否存在死锁情况。

对于 synchronized (this) {...} 代码块,使用当前对象作为同步锁,确保在多线程环境下只有一个线程可以进入代码块。其中,该代码块核心为当承诺未完成时,一直执行循环。

在循环内部,调用 incWaiters() 方法增加等待中的线程计数器。同时,调用 wait() 方法,使当前线程进入等待状态,直到其他线程调用该对象的 notify()notifyAll() 方法唤醒。但无论如何,最终都会执行 decWaiters() 方法来减少等待中的线程计数器。

接下来,我们看看 isDone() 方法的具体实现。

isDone 源码解析

代码语言:java
复制
private static boolean isDone0(Object result) {  
    return result != null && result != UNCANCELLABLE;  
}

上述代码主要作用是判断给定的 result 是否满足完成的条件。

【选题思路】

在日常使用 Netty 中,连接超时是我们经常会遇见的一个问题,因此通过深入分析 ChannelFuture.sync() 方法的执行过程,对 connect 源码的解析,让我们了解到在超时连接设置中发挥作用的一些代码。

这些源码解析的过程帮助我们更好地理解了 ChannelFuture.sync() 方法的执行流程,并且使我们能够更好地降低意外情况的发生率,并提高系统的稳定性和可靠性。

【创作提纲】

1、介绍连接超时运用场景及处理状况;

2、讲解核心函数 connect 的源码;

3、讲解 ChannelFuture.sync 执行过程中的 sync 源码;

4、讲解 ChannelFuture.sync 执行过程中的 await 源码;

5、讲解 ChannelFuture.sync 执行过程中的 isDone 源码;

6、总结;

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • connect 源码解析
  • ChannelFuture.sync() 执行过程解析
    • sync 源码解析
      • await 源码解析
        • isDone 源码解析
        • 【选题思路】
        • 【创作提纲】
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档