前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JAVA语言异步非阻塞设计模式(原理篇)

JAVA语言异步非阻塞设计模式(原理篇)

作者头像
深度学习与Python
发布2023-04-01 16:15:44
9400
发布2023-04-01 16:15:44
举报
文章被收录于专栏:深度学习与python

作者 | 白宇(经授权转载自公众号有道技术团队)

编辑 | 刘振宇

本文主要讲解Java语言异步非阻塞模型的原理,以及核心设计模式“Promise”的基本特性。

1概述

异步非阻塞 [A] 是一种高性能的线程模型,在 IO 密集型系统中得到广泛应用。

在该模型下,系统发起耗时请求后不需要等待响应,期间可以执行其他操作;当收到响应后,系统收到通知并执行后续处理。由于消除了不必要的等待,这种模型能够充分利用 cpu、线程等资源,提高资源利用率。

然而,异步非阻塞模式在提升性能的同时,也带来了编码实现上的复杂性。请求和响应可能分离到不同线程中,需要编写额外代码完成响应结果的传递。Promise 设计模式 [B] 可以降低这种复杂性,封装数据传递、时序控制、线程安全等实现细节,从而提供简洁的 API 形式。

本文首先介绍异步非阻塞模式,从线程模型的角度分析阻塞和非阻塞模式的区别。之后介绍 Promise 设计模式的应用场景及工作流程。最后,提供一种简易的 Java 实现,能够实现基本的功能需求,并做到线程安全。

在正式探索技术问题之前,我们先来看看什么是异步非阻塞模型。如图 1-1 所示,展示了两个小人通信的场景:

图 1-1 两个小人通信

  1. 两个小人代表互相通信的两个线程,如数据库的客户端和服务端;他们可以部署在不同的机器上。
  2. 小人之间互相投递苹果,代表要传递的消息。根据具体业务场景,这些消息可能会称为 request、response、packet、document、record 等。
  3. 小人之间需要建立信道,消息才得以传递。根据场景,信道称为 channel、connection 等。

假设左侧小人发起请求,而右侧小人处理请求并发送响应:左侧小人先投出一个苹果 request,被右侧小人接收到;右侧小人进行处理后,再投出苹果 response,被左侧小人接收到。我们考察左侧小人在等待响应期间的行为,根据他在等待 response 期间是否能处理其他工作,将其归纳为“同步阻塞”和“异步非阻塞”两种模式。

首先我们看看同步阻塞式通信的流程,如图 1-2a 所示。

图 1-2a 同步阻塞式通信

  1. 投递。左侧小人投递 request,并等待接收 response。
  2. 等待。在等待接收 response 期间,左侧小人休息。不论是否还有其他 request 需要投递、是否还有其他工作需要处理,他都视若无睹,绝对不会因此打断休息。
  3. 响应。在收到 response 后,小人从休息中唤醒并处理 response。

接下来我们看看异步非阻塞式通信的流程,如图 1-2b 所示。

图 1-2b 异步非阻塞式通信

  1. 缓存。左侧小人投递 request,并等待接收 response 。和同步阻塞模式不同,小人并不需要亲手接住苹果 response,而是在地上放置一个盘子称为“buffer”;如果小人暂时不在场,那么所收到的苹果可以先存在盘子里,稍后再处理。
  2. 暂离。由于有盘子 buffer 的存在,小人投递 request 后就可以暂时离开,去处理其他工作,当然也可以去投递下一个 request;如果需要向不同的 channel 投递 request ,那么小人可以多摆放几个盘子,和 channel 一一对应。
  3. 响应。小人离开后,一旦某个盘子收到了 response ,一只“大喇叭”就会响起,发出“channelRead”通知,呼唤小人回来处理 response。如果要处理多个 response 或多个 channel,那么 channelRead 通知还需要携带参数,以说明从哪个 channel 上收到了哪个 response。

这里的大喇叭可以用 NIO 或 AIO 来实现。简单来说,NIO 是指不停地轮询每个盘子,一旦看到苹果就发出通知;AIO 是指在收到苹果时直接触发通知,而没有轮询的过程。

当然,本系列文章的读者并不需要了解更多实现细节,只需知道异步非阻塞模式依赖于“大喇叭”来实现,它替代小人等待接收 response,从而解放小人去处理其他工作。

根据上面的分析,同步模式具有下列严重缺点

  1. 同步阻塞模式的工作效率十分低下。小人大部分时间都在休息,仅当投递请求、处理响应时,才偶尔醒来工作一小会;而在异步非阻塞模式下,小人从不休息,马不停蹄地投递请求、处理响应,或处理其他工作。
  2. 同步阻塞模式会带来延迟。

我们考虑下面两种情况,如图 1-3 所示。

a. channel 复用,即左侧小人在一个 channel 上连续发送多条消息。在同步阻塞模式下,一轮(即请求 + 响应)只能投递一个请求(苹果 1),而后续请求(苹果 2-4)都只能排队等待,右侧小人需要等待很多轮才能收到所期望的全部消息。此外,左侧小人在等待接收某个 response 期间,没有机会处理收到的其他消息,造成了数据处理的延迟。不得不感慨,左侧小人太懒惰了!图片

图 1-3a channel 复用

b. 线程复用,即一个线程(小人)向多个 channel 发送消息(苹果 1-3,分别发向不同 channel)。左侧小人同一时刻只能做一件事,要么在工作,要么在休息;他投递了苹果 1 后就躺下休息,等待响应,全然不顾右侧小人 2、3 还在等待他们想要的苹果 2、3。图片

图 1-3b 线程复用

在这一章里我们用漫画的形式,初步体验了同步阻塞模式与异步非阻塞模式,并分析了两种模式的区别。接下来我们从 Java 线程入手,对两种模式进行更加正式、更加贴近实际的分析。

2异步非阻塞模型

2.1Java 线程状态

在 Java 程序中,线程是调度执行的单元。线程可以获得 CPU 使用权来执行代码,从而完成有意义的工作。工作进行期间,有时会因为等待获取锁、等待网络 IO 等原因而暂停,通称“同步”或“阻塞”;如果多项工作能够同时进行,之间不存在约束、不需要互相等待,这种情况就称为“异步”或“非阻塞”

受限于内存、系统线程数、上下文切换开销,Java 程序并不能无限创建线程;因此,我们只能创建有限个线程,并尽量提高线程的利用率,即增加其工作时长、降低阻塞时长。异步非阻塞模型是减少阻塞、提高线程利用率的有效手段。当然,这种模型并不能消除所有的阻塞。我们首先来看看 Java 线程有哪些状态 [C],其中哪些阻塞是必要的,哪些阻塞可以避免。

Java 线程状态包括

  • RUNNABLE:线程在执行有意义的工作

如图 2-1a,线程如果在执行纯内存运算,那么处于 RUNNABLE 状态

根据是否获得 cpu 使用权,又分为两个子状态:READY、RUNNING

  • BLOCKED/WAITING/TIMED_WAITING:线程正在阻塞

如图 2-1b、2-1c、2-1d,根据阻塞原因,线程处于下列状态之一:

  • BLOCKED:synchronized 等待获取锁;
  • WAITING/TIMED_WAITING:Lock 等待获取锁。两种状态的区别为是否设置超时时长。

图 2-1 Java 线程状态

此外,如果 Java 线程正在进行网络 IO,则线程状态为 RUNNABLE,但是实际上也发生了阻塞。以 socket 编程为例,如图 2-2 所示,在收到数据之前 InputStream.read() 会阻塞,此时线程状态为 RUNNABLE。

图 2-2 网络 IO

综上,Java 线程状态包括:RUNNABLE、BLOCKED、WAITING、TIMED_WAITING。其中,RUNNABLE 状态又分为内存计算(非阻塞)、网络 IO(阻塞)两种情况,而其余状态都是阻塞的。

根据阻塞原因,本文将 Java 线程状态归纳为以下 3 类:RUNNABLE、IO、BLOCKED

  1. RUNNABLE:Java 线程状态为 RUNNABLE,并且在执行有用的内存计算,无阻塞;
  2. IO:Java 线程状态为 RUNNABLE,但是正在进行网络 IO,发生阻塞;
  3. BLOCKED:Java 线程状态为 BLOCKED/WAITING/TIMED_WAITING,在并发工具的控制下,线程等待获取某一种锁,发生阻塞。

提高线程利用率,就要增加线程处于 RUNNABLE 状态的时长,降低处于 IO 和 BLOCKED 状态的时长。BLOCKED 状态一般是不可避免的,因为线程间需要通信,需要对临界区进行并发控制;但是,如果采用适当的线程模型,那么 IO 状态的时长就可以得到降低,而这就是异步非阻塞模型。

2.2 线程模型:阻塞 vs 非阻塞

异步非阻塞模型能够降低 IO 阻塞时长,提高线程利用率。下面以数据库访问为例,分析同步和异步 API 的线程模型。如图 3 所示,过程中涉及 3 个函数

  1. writeSync() 或 writeAsync():数据库访问,发送请求
  2. process(result):处理服务器响应(以 result 表示)
  3. doOtherThings():任意其他操作,逻辑上不依赖服务器响应

同步 API 如图 2-3a 所示:调用者首先发送请求,然后在网络连接上等待来自服务器的响应数据。API 会一直阻塞,直至收到响应才返回;期间调用者线程无法执行其他操作,即使该操作并不依赖服务器响应。实际的执行顺序为:

  1. writeSync()
  2. process(result)
  3. doOtherThings() // 直至收到结果,当前线程才能执行其他操作

异步 API 如图 2-3b 所示:调用者发送请求并注册回调,然后 API 立刻返回,接下来调用者可以执行任意操作。稍后底层网络连接收到响应数据,触发调用者所注册的回调。实际的执行顺序为:

  1. writeAsync()
  2. doOtherThings() // 已经可以执行其他操作,并不需要等待响应
  3. process(result)

图 2-3 同步 API & 异步 API

在上述过程中,函数 doOtherThings() 并不依赖服务器响应,原则上可以和数据库访问同时执行。然而对于同步 API,调用者被迫等待服务器响应,然后才可以执行 doOtherThings();即数据库访问期间线程阻塞于 IO 状态,无法执行其他有用的操作,利用率十分低下。而异步 API 就没有这个限制,显得更加紧凑、高效。

在 IO 密集型系统中,适当使用异步非阻塞模型,可以提升数据库访问吞吐量。

考虑这样一个场景:需要执行多条数据库访问请求,且请求之间互相独立,无依赖关系。使用同步 API 和异步 API ,线程状态随时间变化的过程如图 2-4 所示。

图 2-4 线程时间线:数据库访问

线程交替处于 RUNNABLE 和 IO 状态。在 RUNNABLE 状态下,线程执行内存计算,如提交请求、处理响应。在 IO 状态下,线程在网络连接上等待响应数据。在实际系统中,内存计算的速度非常快,RUNNABLE 状态的时长基本可忽略;而网络传输的耗时会相对更长(几十到几百毫秒),IO 状态的时长更加可观。

  1. 同步 API:调用者线程一次只能提交一个请求;直到请求返回后,才能再提交下一个请求。线程利用率很低,大部分时间消耗在 IO 状态上。
  2. 异步 API:调用者线程可以连续提交多个请求,而之前提交的请求都还没有收到响应。调用者线程会注册一些回调,这些回调存储在内存中;稍后网络连接上收到响应数据,某个接收线程被通知处理响应数据,从内存中取出所注册的回调,并触发回调。这种模型下,请求可以连续地提交、连续的响应,从而节约 IO 状态的耗时。

异步非阻塞模式在 IO 密集型系统中应用非常广泛。常用的中间件,如 http 请求 [D]、redis[E]、mongo DB[F]、elasticsearch[G]、influx DB[H],都支持异步 API。各位读者可以在参考文献中,查阅这些异步 API 的样例代码。关于中间件的异步 API ,下面有几个注意事项

  1. redis 的常见客户端有 jedislettuce[E] 。其中 lettuce 提供了异步 API,而 jedis 只能提供同步 API ;二者对比参见文章 [I]。
  2. kafka producer[J] 的 send() 方法也支持异步 API ,但是该 API 实际上不是纯异步的 [K]:当底层缓存满,或者无法获取服务器(broker)信息时,send() 方法会发生阻塞。个人认为这是一个非常严重的设计缺陷。kafka 常用于低延迟日志采集场景,系统会将日志通过网络写入到 kafka 服务器,以减少线程内的阻塞,提升线程吞吐量;稍后其他进程会从 kafka 消费所写入的日志,进行持久存储。设想一个实时通信系统,单条线程每秒需要处理几万到几十万条消息,响应时间一般为几毫秒到几十毫秒。系统在处理期间需要经常调用 send() 来上报日志,如果每次调用都发生哪怕 1 秒的延迟(实际有可能达几十秒),延迟积累起来也会严重劣化吞吐量和延迟。

最后,异步 API 有多种实现,包括线程池、select(如 netty 4.x[L])、epoll 等。其共同点是调用者不需要在某一条网络连接上阻塞,以等待接收数据;相反,API 底层常驻有限数目的线程,当收到数据后,某一线程得到通知并触发回调。这种模型也称为“响应式”模型,非常贴切。限于篇幅原因,本文主要关注异步 API 设计,而不深入讲解异步 API 的实现原理。

3Promise 设计模式

3.1 API 形式:同步、异步 listener、异步 Promise

上一章介绍了异步非阻塞模式和异步 API 的函数形式。异步 API 具有以下特征

  1. 在提交请求时注册回调;
  2. 提交请求后,函数立刻返回,不需要等待收到响应;
  3. 收到响应后,触发所注册的回调;根据底层实现,可以利用有限数目的线程来接收响应数据,并在这些线程中执行回调。

在保留异步特性的基础上,异步 API 的形式可以进一步优化。上一章图 2-3b 展示了异步 API 的 listener 版本,特点是在提交请求时必须注册恰好一个回调;因而在下列场景下,listener API 会难以满足功能需求,需要调用者做进一步处理:

  1. 多个对象都关注响应数据,即需要注册多个回调;但是 listener 只支持注册一个回调。
  2. 需要将异步调用转为同步调用。例如某些框架(如 spring )需要同步返回,或者我们希望主线程阻塞直至操作完成,然后主线程结束、进程退出;但是 listener 只支持纯异步,调用者需要重复编写异步转同步的代码。

为了应对上述场景,我们可以使用 Promise 设计模式来重构异步 API ,以支持多个回调和同步调用。下面对同步 API、异步 listener API、异步 Promise API 的函数形式进行对比,如图 3-1 所示:

  1. 同步:调用 writeSync() 方法并阻塞;收到响应后函数停止阻塞,并返回响应数据;
  2. 异步 listener:调用 writeAsync() 方法并注册 listener,函数立刻返回;收到响应后,在其他线程触发所注册的 listener;
  3. 异步 Promise:调用 writeAsync(),但不需要在函数中注册 listener,函数立刻返回 Promise 对象。调用者可以调用异步的 Promise.await(listener),注册任意数目的 listener,收到响应后会按顺序触发;此外,也可以调用同步的 Promise.await() ,阻塞直至收到响应。

图 3-1 API 形式:同步、异步 listener、异步 Promise

综上,Promise API 在保持异步特性的前提下,提供了更高的灵活性。调用者可以自由选择函数是否阻塞,以及注册任意数目的回调。

3.2 Promise 的特性与实现

上一节介绍了 Promise API 的使用样例,其核心是一个 Promise 对象,支持注册 listener,以及同步获取响应 result;而本节将对 Promise 的功能进行更加详细的定义。注意,本节并不限定 Promise 的某一具体实现(例:jdk CompletableFuture、netty DefaultPromise),只展示共有的、必须具备的特性;缺少这些特性,Promise 将无法完成异步传递响应数据的工作。

 3.2.1 功能特性

  • Promise 的基本方法

Promise 的基本功能是传递响应数据,需要支持下列方法,如表 3-1 所示:

下面以上一小节的数据库访问 API 为例,演示 Promise 的工作流程,如图 3-2 所示:

  1. 调用者调用 writeAsync() API ,提交数据库访问请求并获取 Promise 对象;然后调用 Promise.await(listener),注册对响应数据的 listener。Promise 对象也可以传递给程序中其他地方,使得关心响应数据的其他代码,各自注册更多 listener。
  2. writeAsync() 内部,创建 Promise 对象并和这次请求关联起来,假设以 requestId 标识。
  3. writeAsync() 底层常驻有限数目的线程,用于发送请求和接收响应。以 netty 为例,当从网络上收到响应据后,其中一个线程得到通知,执行 channelRead() 函数进行处理;函数取出响应数据和对应的 Promise 对象,并调用 Promise.signalAll() 进行通知。注意这里是伪代码,和 netty 中回调函数的实际签名略有区别。

图 3-2a 提交数据库访问请求

图 3-2b 创建 Promise 对象

图 3-2c 通知 Promise 对象

  • Promise 的时序

Promise 的方法需要保证以下时序。此处以“A 对 B 可见”来描述时序,即:如果先执行操作 A(注册 listener)就会产生某种永久效应(永久记录这个 listener),之后再执行操作 B(通知 result)就必须考虑到这种效应,执行相应的处理(触发之前记录的 listener)。

  1. await(listener) 对 signalAll(result) 可见:注册若干 listener 后,通知 result 时必须触发每一个 listener,不允许遗漏。
  2. signalAll(result) 对 await(listener) 可见:通知 result 后,再注册 listener 就会立刻触发。
  3. 首次 signalAll(result) 对后续 signalAll(result) 可见。首次通知 result 后,result 即唯一确定,永不改变。之后再通知 result 就会忽略,不产生任何副作用。请求超时是该特性一种典型应用:在提交请求的同时创建一个定时任务;如果能在超时时长内正确收到响应数据,则通知 Promise 正常结束;否则定时任务超时,通知 Promise 异常结束。不论上述事件哪个先发生,都保证只采纳首次通知,使得请求结果唯一确定。

此外,某次 await(listener) 最好对后续 await(listener) 可见,以保证 listener 严格按照注册顺序来触发。

  • Promise 的非线程安全实现

如不考虑线程安全,那么下列代码清单可以实现 Promise 的基本特性;线程安全的实现见下一小节。代码清单依次展示了 await(listener): void、signalAll(result)、await(): result 的实现。这里有几个注意事项:

  1. 字段 listeners 存储 await(listener) 所注册的 listener 。字段类型为 LinkedList,以存储任意数目的 listener,同时维护 listener 的触发顺序。
  2. 字段 isSignaled 记录是否通知过 result。如果 isSignaled=true,则后续调用 await(listener) 时立刻触发 listener,且后续调用 signalAll(result) 时直接忽略。此外,我们以 isSignaled=true 而不是 result=null 来判断是否通知过 result ,因为某些情况下 null 本身也可以作为响应数据。例如,我们以 Promise表示数据库写入的结果,通知 null 表示写入成功,通知 Exception 对象(或某一子类)表示失败原因。
  3. signalAll(T result) 在末尾处调用 listeners.clear() 以释放内存,因为 listeners 已经触发过,不再需要在内存中存储。
代码语言:javascript
复制
public class Promise<T> {
    private boolean isSignaled = false;    private T result;
    private final List<Consumer<T>> listeners = new LinkedList<>();
    public void await(Consumer<T> listener) {        if (isSignaled) {            listener.accept(result);            return;        }
        listeners.add(listener);    }
    public void signalAll(T result) {        if (isSignaled) {            return;        }
        this.result = result;        isSignaled = true;        for (Consumer<T> listener : listeners) {            listener.accept(result);        }        listeners.clear();    }
    public T await() {        // 适当阻塞,直至signalAll()被调用;实际实现见3.2.2节        return result;    }}

 3.2.2 线程安全特性

上一章 3.2.1 节讲解了 Promise 的功能,并提供了非线程安全的实现。本节展示如何使用并发工具,实现线程安全的 Promise,如下所示。有下列几个注意事项

  1. 线程安全。各个字段均被多个线程访问,因此都属于临界区,需要使用适当的线程安全工具进行上锁,如 synchronized、Lock 。一种最简单的实现,是将全部代码纳入临界区内,进入方法时上锁,离开方法时放锁。注意在使用 return 进行提前返回时,不要忘记放锁。
  2. 在临界区外触发 listener,以减少在临界区内停留的时长,并减少潜在的死锁风险。
  3. 同步 await() 。可以使用任何一种同步等待的工具来实现,如 CountDownLatch、Condition。此处使用 Condition 实现,注意根据 java 语法,操作 Condition 时必须先获取 Condition 所关联的锁。
代码语言:javascript
复制
public class Promise<T> {
    private final ReentrantLock lock = new ReentrantLock();    private final Condition resultCondition = lock.newCondition();
    private boolean isSignaled = false;    private T result;
    private final List<Consumer<T>> listeners = new LinkedList<>();
    public void await(Consumer<T> listener) {        lock.lock();        if (isSignaled) {            lock.unlock(); // 不要忘记放锁            listener.accept(result); // 在临界区外触发listener            return;        }
        listeners.add(listener);        lock.unlock();    }
    public void signalAll(T result) {        lock.lock();        if (isSignaled) {            lock.unlock(); // 不要忘记放锁            return;        }
        this.result = result;        isSignaled = true;
        // this.listeners的副本        List<Consumer<T>> listeners = new ArrayList<>(this.listeners);        this.listeners.clear();        lock.unlock();
        for (Consumer<T> listener : listeners) {            listener.accept(result); // 在临界区外触发listener        }
/* 操作Condition时须上锁*/        lock.lock();        resultCondition.signalAll();        lock.unlock();    }
    public T await() {        lock.lock();        if (isSignaled) {            lock.unlock(); // 不要忘记放锁            return result;        }
        while (!isSignaled) {            resultCondition.awaitUninterruptibly();        }        lock.unlock();
        return result;    }}

上述实现仅做演示使用,仍有较大的改进空间。生产环境的实现原理,读者可以参考 jdk CompletableFutre、netty DefaultPromise 。可以改进的地方包括:

  1. 使用 CAS 设置响应数据。字段 isSignaled、result 可以合并为一个数据对象,然后使用 CAS 进行设值,从而进一步降低阻塞时长。
  2. 触发 listener 的时序。在上述代码中,Promise.signalAll() 会依次触发 listener;在此期间,如果其他线程调用了异步 await(listener),由于 Promise 的响应数据已概括,该线程也会触发 listener。上述过程中,两个线程同时触发 listener,因此没有严格保证触发顺序。作为改进,类似于 netty DefaultPromise,Promise.signalAll() 内部可以设置一个循环,不断触发 listener 直至 listeners 排空,以防期间注册了新的 listener;在此期间,新注册的 listener 可以直接加入到 listeners 中,而不是立刻触发。
  3. listener 的移除。在通知响应数据之前,Promise 长期持有 listener 的引用,导致 listener 对象无法被 gc 。可以添加 remove(listener) 方法,或者允许仅持有 listener 的弱引用。

 3.2.3 须避免的特性

前面的小节展示了 Promise 的特性与实现原理。纯正的 Promise 是异步传递响应数据的工具,其应当只实现必要的数据传递特性,而不应当夹杂请求提交、数据处理等逻辑。接下来我们来看一看,Promise 在实现时应避免哪些特性,以防限制调用者所能做出的决策。

  1. 异步 await() 发生阻塞;该规则不仅适用于 Promise,也适用于任何异步 API。异步 API 常用于实时通信等延时敏感的场景,作用是减少线程阻塞,避免推迟后续其他操作。一旦发生阻塞,系统的响应速度和吞吐量就会受到严重冲击。

以连续提交数据库请求为例。如图 3-3a 所示,调用者调用了一个异步 API,连续提交 3 次写入请求,并在所返回的 Promise 上注册回调。

我们考察 writeAsync() 与 await() 如发生阻塞,将会对调用者造成什么影响,如图 3-3b 所示。提交请求是纯内存操作,线程处于 RUNNABLE 状态;writeAsync() 或 await() 如果发生阻塞,则线程处于 BLOCKED 状态,暂停工作而无法执行后续操作。当发生阻塞时,调用者每提交一个请求就不得不等待一段时间,从而降低了提交请求的频率,进而推迟了服务器对这些请求的响应,使得系统的吞吐量降低、延迟上升。特别地,如果系统采用了多路复用机制,即一个线程可以处理多个网络连接或多个请求,那么线程阻塞将会严重拖慢后续请求的处理,造成比较难排查的故障。

常见的阻塞原因包括:

  1. Thread.sleep()
  2. 向队列提交任务,调用了 BlockingQueue.put() 和 take();应改为非阻塞的 offer() 和 poll()
  3. 向线程池提交任务,ExecutorService.submit(),如果线程池拒绝策略为 CallerRunsPolicy,而任务本身又是耗时的。
  4. 调用了阻塞的函数,包括:InputStream.read()、同步的 Promise.await() 、KafkaProducer.send() 。注意 KafkaProducer.send() 虽然形式上是异步 API,但是在底层缓存满或者无法获取服务器(broker)信息时,send() 方法仍会发生阻塞。

图 3-3a 连续提交请求

图 3-3b 请求处理时间线

  1. 绑定线程池( ExecutorService ),用于执行请求。如图 3-4 所示,线程池是异步 API 的一种可选模型,但并不是唯一实现。
  2. 线程池模型。为了不阻塞调用者,API 内置了线程池来提交请求、处理响应;调用者可以向线程池连续提交多个请求,但是不需要等待响应。调用者提交一条请求后,线程池中的某条线程就会被独占,等待接收响应并进行处理,但在此之前无法再处理其他请求;完成处理后,该条线程重新变为空闲,可以继续处理后续请求。
  3. 响应式模型。类似地,API 内置了发送和接收线程来提交请求、处理响应,调用者也不需要同步等待。调用者提交一条请求后,发送线程向网络发送请求;完成发送后,线程立刻变为空闲,可以发送后续请求。当收到响应数据时,接收线程得到通知以处理响应;完成处理后,线程立刻变为空闲,可以处理后续响应数据。上述过程中,任何一条线程都不会被某一请求独占,即线程随时都可以处理请求,而不需要等待之前的请求被响应。

综上,如果绑定了线程池,Promise 就实现了对其他模型(如响应式模型)的兼容性。

图 3-4 线程时间线:线程池 vs 响应式

  1. 在构造方法创建 Promise 对象时,定义如何提交请求。这种方式只能定义如何处理单条请求,而无法实现请求的批量处理。

以数据库访问为例,现代数据库一般支持批量读写,以略微提升单次访问的延迟为代价,换来吞吐量显著提升;如果吞吐量得到提升,那么平均延迟反而会下降。下面的代码片段展示了一个批量请求 API :数据对象 BulkRequest 可以携带多条普通请求 Request ,从而实现批量提交。

代码语言:javascript
复制
/* 提交单条请求*/client.submit(new Request(1));client.submit(new Request(2));client.submit(new Request(3));
/* 提交批量请求*/client.submit(new BulkRequest(        new Request(1),        new Request(2),        new Request(3)));

为了充分利用“批量请求”的特性,调用者需要进行跨越多条请求的“宏观调控”。请求产生后可以先缓存起来;等待一段时间后,取出所缓存的多条请求,组装一个批量请求来一并提交。因此,如下面的代码片段所示,在构造 Promise 时指定如何提交单条请求是没有意义的,这部分代码(client.submit(new Request(...)))并不会被执行;而实际希望执行的代码,其实是提交批量请求(client.submit(new BulkRequest(...)))。

代码语言:javascript
复制
/* Promise:提交单条请求*/new Promise<>(() -> client.submit(new Request(1)));new Promise<>(() -> client.submit(new Request(2)));new Promise<>(() -> client.submit(new Request(3)));
  1. 在构造方法创建 Promise 对象时,定义如何处理响应数据,而不允许后续对响应数据注册回调。如下面的代码片段所示,在构造 Promise 对象时,注册了对响应数据的处理 process(result);但是除此以外,其他代码也有可能关心响应数据,需要注册回调 process1(result)、process2(result)。如果 Promise 只能在构造时注册唯一回调,那么其他关注者就无法注册所需回调函数,即 Promise API 退化回 listener API。
代码语言:javascript
复制
/* 定义如何处理响应数据*/Promise<String> promise = new Promise<>(result -> process(result));
/* 其他代码也关心响应数据*/promise.await(result -> process1(result));promise.await(result -> process2(result));

综上,Promise 应该是一个纯粹的数据对象,其职责是存储回调函数、存储响应数据;同时做好时序控制,保证触发回调函数无遗漏、保证触发顺序。除此以外,Promise 不应该和任何实现策略相耦合,不应该杂糅提交请求、处理响应的逻辑。

4总结

本文讲解了异步非阻塞设计模式,并对同步 API、异步 listener API、异步 Promise API 进行了对比。相比于其他两种 API,Promise API 具有无可比拟的灵活性,调用者可以自由决定同步返回还是异步返回,并允许对响应数据注册多个回调函数。最后,本文讲解了 Promise 基本功能的实现,并初步实现了线程安全特性。

本系列共 2 篇文章,本文为第 1 篇《原理篇》。在下一篇《应用篇》中,我们将看到 Promise 设计模式丰富的应用场景,将其和现有工具进行结合或对比,以及对 Promise API 进行进一步变形和封装,提供异常处理、调度策略等特性。

如果小伙伴们想要进一步交流,可以添加有道技术团队助手(ydtech01)与我们联络。

5参考文献

[A] 异步非阻塞 IO

https://en.wikipedia.org/wiki/Asynchronous_I/O

[B] Promise

https://en.wikipedia.org/wiki/Futures_and_promises

[C] java 线程状态

https://segmentfault.com/a/1190000038392244

[D] http 异步 API 样例:apache HttpAsyncClient

https://hc.apache.org/httpcomponents-asyncclient-4.1.x/quickstart.html

[E] redis 异步 API 样例:lettuce

https://github.com/lettuce-io/lettuce-core/wiki/Asynchronous-API

[F] mongo DB 异步 API 样例:AsyncMongoClient

https://mongodb.github.io/mongo-java-driver/3.0/driver-async/getting-started/quick-tour/

[G] elasticsearch 异步 API 样例:RestHighLevelClient

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-index.html

[H] influx DB 异步 API 样例:influxdb-java

https://github.com/influxdata/influxdb-java/blob/master/MANUAL.md

[I] jedis vs lettuce

https://redislabs.com/blog/jedis-vs-lettuce-an-exploration/

[J] kafka

http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html

[K] KafkaProducer.send() 阻塞

https://stackoverflow.com/questions/57140680/kafka-asynchronous-send-not-really-asynchronous

[L] netty

https://netty.io/wiki/user-guide-for-4.x.html

本周好文推荐

有钱没人!人才短缺将成云计算快速发展的致命弱点

我的开源代码被科技巨头偷了,对方还跑到我面前演示

七部委进驻滴滴;活久见!腾讯阿里讲和;京东宣布全员涨薪两个月;IPIP.net状告阿里云 | Q资讯

CentOS 8退役倒计时,开发者们又吵起来了


InfoQ 写作平台欢迎所有热爱技术、热爱创作、热爱分享的内容创作者入驻!

还有更多超值活动等你来!

扫描下方二维码

填写申请,成为作者

开启你的创作之路吧~

点个在看少个 bug 👇

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-07-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 InfoQ 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档