作者 | 白宇(经授权转载自公众号有道技术团队)
编辑 | 刘振宇
本文主要讲解Java语言异步非阻塞模型的原理,以及核心设计模式“Promise”的基本特性。
1概述
异步非阻塞 [A] 是一种高性能的线程模型,在 IO 密集型系统中得到广泛应用。
在该模型下,系统发起耗时请求后不需要等待响应,期间可以执行其他操作;当收到响应后,系统收到通知并执行后续处理。由于消除了不必要的等待,这种模型能够充分利用 cpu、线程等资源,提高资源利用率。
然而,异步非阻塞模式在提升性能的同时,也带来了编码实现上的复杂性。请求和响应可能分离到不同线程中,需要编写额外代码完成响应结果的传递。Promise 设计模式 [B] 可以降低这种复杂性,封装数据传递、时序控制、线程安全等实现细节,从而提供简洁的 API 形式。
本文首先介绍异步非阻塞模式,从线程模型的角度分析阻塞和非阻塞模式的区别。之后介绍 Promise 设计模式的应用场景及工作流程。最后,提供一种简易的 Java 实现,能够实现基本的功能需求,并做到线程安全。
在正式探索技术问题之前,我们先来看看什么是异步非阻塞模型。如图 1-1 所示,展示了两个小人通信的场景:
图 1-1 两个小人通信
假设左侧小人发起请求,而右侧小人处理请求并发送响应:左侧小人先投出一个苹果 request,被右侧小人接收到;右侧小人进行处理后,再投出苹果 response,被左侧小人接收到。我们考察左侧小人在等待响应期间的行为,根据他在等待 response 期间是否能处理其他工作,将其归纳为“同步阻塞”和“异步非阻塞”两种模式。
首先我们看看同步阻塞式通信的流程,如图 1-2a 所示。
图 1-2a 同步阻塞式通信
接下来我们看看异步非阻塞式通信的流程,如图 1-2b 所示。
图 1-2b 异步非阻塞式通信
这里的大喇叭可以用 NIO 或 AIO 来实现。简单来说,NIO 是指不停地轮询每个盘子,一旦看到苹果就发出通知;AIO 是指在收到苹果时直接触发通知,而没有轮询的过程。
当然,本系列文章的读者并不需要了解更多实现细节,只需知道异步非阻塞模式依赖于“大喇叭”来实现,它替代小人等待接收 response,从而解放小人去处理其他工作。
根据上面的分析,同步模式具有下列严重缺点:
我们考虑下面两种情况,如图 1-3 所示。
a. channel 复用,即左侧小人在一个 channel 上连续发送多条消息。在同步阻塞模式下,一轮(即请求 + 响应)只能投递一个请求(苹果 1),而后续请求(苹果 2-4)都只能排队等待,右侧小人需要等待很多轮才能收到所期望的全部消息。此外,左侧小人在等待接收某个 response 期间,没有机会处理收到的其他消息,造成了数据处理的延迟。不得不感慨,左侧小人太懒惰了!图片
图 1-3a channel 复用
b. 线程复用,即一个线程(小人)向多个 channel 发送消息(苹果 1-3,分别发向不同 channel)。左侧小人同一时刻只能做一件事,要么在工作,要么在休息;他投递了苹果 1 后就躺下休息,等待响应,全然不顾右侧小人 2、3 还在等待他们想要的苹果 2、3。图片
图 1-3b 线程复用
在这一章里我们用漫画的形式,初步体验了同步阻塞模式与异步非阻塞模式,并分析了两种模式的区别。接下来我们从 Java 线程入手,对两种模式进行更加正式、更加贴近实际的分析。
2异步非阻塞模型
2.1Java 线程状态
在 Java 程序中,线程是调度执行的单元。线程可以获得 CPU 使用权来执行代码,从而完成有意义的工作。工作进行期间,有时会因为等待获取锁、等待网络 IO 等原因而暂停,通称“同步”或“阻塞”;如果多项工作能够同时进行,之间不存在约束、不需要互相等待,这种情况就称为“异步”或“非阻塞”。
受限于内存、系统线程数、上下文切换开销,Java 程序并不能无限创建线程;因此,我们只能创建有限个线程,并尽量提高线程的利用率,即增加其工作时长、降低阻塞时长。异步非阻塞模型是减少阻塞、提高线程利用率的有效手段。当然,这种模型并不能消除所有的阻塞。我们首先来看看 Java 线程有哪些状态 [C],其中哪些阻塞是必要的,哪些阻塞可以避免。
Java 线程状态包括:
如图 2-1a,线程如果在执行纯内存运算,那么处于 RUNNABLE 状态
根据是否获得 cpu 使用权,又分为两个子状态:READY、RUNNING
如图 2-1b、2-1c、2-1d,根据阻塞原因,线程处于下列状态之一:
图 2-1 Java 线程状态
此外,如果 Java 线程正在进行网络 IO,则线程状态为 RUNNABLE,但是实际上也发生了阻塞。以 socket 编程为例,如图 2-2 所示,在收到数据之前 InputStream.read() 会阻塞,此时线程状态为 RUNNABLE。
图 2-2 网络 IO
综上,Java 线程状态包括:RUNNABLE、BLOCKED、WAITING、TIMED_WAITING。其中,RUNNABLE 状态又分为内存计算(非阻塞)、网络 IO(阻塞)两种情况,而其余状态都是阻塞的。
根据阻塞原因,本文将 Java 线程状态归纳为以下 3 类:RUNNABLE、IO、BLOCKED
要提高线程利用率,就要增加线程处于 RUNNABLE 状态的时长,降低处于 IO 和 BLOCKED 状态的时长。BLOCKED 状态一般是不可避免的,因为线程间需要通信,需要对临界区进行并发控制;但是,如果采用适当的线程模型,那么 IO 状态的时长就可以得到降低,而这就是异步非阻塞模型。
2.2 线程模型:阻塞 vs 非阻塞
异步非阻塞模型能够降低 IO 阻塞时长,提高线程利用率。下面以数据库访问为例,分析同步和异步 API 的线程模型。如图 3 所示,过程中涉及 3 个函数:
同步 API 如图 2-3a 所示:调用者首先发送请求,然后在网络连接上等待来自服务器的响应数据。API 会一直阻塞,直至收到响应才返回;期间调用者线程无法执行其他操作,即使该操作并不依赖服务器响应。实际的执行顺序为:
异步 API 如图 2-3b 所示:调用者发送请求并注册回调,然后 API 立刻返回,接下来调用者可以执行任意操作。稍后底层网络连接收到响应数据,触发调用者所注册的回调。实际的执行顺序为:
图 2-3 同步 API & 异步 API
在上述过程中,函数 doOtherThings() 并不依赖服务器响应,原则上可以和数据库访问同时执行。然而对于同步 API,调用者被迫等待服务器响应,然后才可以执行 doOtherThings();即数据库访问期间线程阻塞于 IO 状态,无法执行其他有用的操作,利用率十分低下。而异步 API 就没有这个限制,显得更加紧凑、高效。
在 IO 密集型系统中,适当使用异步非阻塞模型,可以提升数据库访问吞吐量。
考虑这样一个场景:需要执行多条数据库访问请求,且请求之间互相独立,无依赖关系。使用同步 API 和异步 API ,线程状态随时间变化的过程如图 2-4 所示。
图 2-4 线程时间线:数据库访问
线程交替处于 RUNNABLE 和 IO 状态。在 RUNNABLE 状态下,线程执行内存计算,如提交请求、处理响应。在 IO 状态下,线程在网络连接上等待响应数据。在实际系统中,内存计算的速度非常快,RUNNABLE 状态的时长基本可忽略;而网络传输的耗时会相对更长(几十到几百毫秒),IO 状态的时长更加可观。
异步非阻塞模式在 IO 密集型系统中应用非常广泛。常用的中间件,如 http 请求 [D]、redis[E]、mongo DB[F]、elasticsearch[G]、influx DB[H],都支持异步 API。各位读者可以在参考文献中,查阅这些异步 API 的样例代码。关于中间件的异步 API ,下面有几个注意事项:
最后,异步 API 有多种实现,包括线程池、select(如 netty 4.x[L])、epoll 等。其共同点是调用者不需要在某一条网络连接上阻塞,以等待接收数据;相反,API 底层常驻有限数目的线程,当收到数据后,某一线程得到通知并触发回调。这种模型也称为“响应式”模型,非常贴切。限于篇幅原因,本文主要关注异步 API 设计,而不深入讲解异步 API 的实现原理。
3Promise 设计模式
3.1 API 形式:同步、异步 listener、异步 Promise
上一章介绍了异步非阻塞模式和异步 API 的函数形式。异步 API 具有以下特征:
在保留异步特性的基础上,异步 API 的形式可以进一步优化。上一章图 2-3b 展示了异步 API 的 listener 版本,特点是在提交请求时必须注册恰好一个回调;因而在下列场景下,listener API 会难以满足功能需求,需要调用者做进一步处理:
为了应对上述场景,我们可以使用 Promise 设计模式来重构异步 API ,以支持多个回调和同步调用。下面对同步 API、异步 listener API、异步 Promise API 的函数形式进行对比,如图 3-1 所示:
图 3-1 API 形式:同步、异步 listener、异步 Promise
综上,Promise API 在保持异步特性的前提下,提供了更高的灵活性。调用者可以自由选择函数是否阻塞,以及注册任意数目的回调。
3.2 Promise 的特性与实现
上一节介绍了 Promise API 的使用样例,其核心是一个 Promise 对象,支持注册 listener,以及同步获取响应 result;而本节将对 Promise 的功能进行更加详细的定义。注意,本节并不限定 Promise 的某一具体实现(例:jdk CompletableFuture、netty DefaultPromise),只展示共有的、必须具备的特性;缺少这些特性,Promise 将无法完成异步传递响应数据的工作。
3.2.1 功能特性
Promise 的基本功能是传递响应数据,需要支持下列方法,如表 3-1 所示:
下面以上一小节的数据库访问 API 为例,演示 Promise 的工作流程,如图 3-2 所示:
图 3-2a 提交数据库访问请求
图 3-2b 创建 Promise 对象
图 3-2c 通知 Promise 对象
Promise 的方法需要保证以下时序。此处以“A 对 B 可见”来描述时序,即:如果先执行操作 A(注册 listener)就会产生某种永久效应(永久记录这个 listener),之后再执行操作 B(通知 result)就必须考虑到这种效应,执行相应的处理(触发之前记录的 listener)。
此外,某次 await(listener) 最好对后续 await(listener) 可见,以保证 listener 严格按照注册顺序来触发。
如不考虑线程安全,那么下列代码清单可以实现 Promise 的基本特性;线程安全的实现见下一小节。代码清单依次展示了 await(listener): void、signalAll(result)、await(): result 的实现。这里有几个注意事项:
public class Promise<T> {
private boolean isSignaled = false; private T result;
private final List<Consumer<T>> listeners = new LinkedList<>();
public void await(Consumer<T> listener) { if (isSignaled) { listener.accept(result); return; }
listeners.add(listener); }
public void signalAll(T result) { if (isSignaled) { return; }
this.result = result; isSignaled = true; for (Consumer<T> listener : listeners) { listener.accept(result); } listeners.clear(); }
public T await() { // 适当阻塞,直至signalAll()被调用;实际实现见3.2.2节 return result; }}
3.2.2 线程安全特性
上一章 3.2.1 节讲解了 Promise 的功能,并提供了非线程安全的实现。本节展示如何使用并发工具,实现线程安全的 Promise,如下所示。有下列几个注意事项:
public class Promise<T> {
private final ReentrantLock lock = new ReentrantLock(); private final Condition resultCondition = lock.newCondition();
private boolean isSignaled = false; private T result;
private final List<Consumer<T>> listeners = new LinkedList<>();
public void await(Consumer<T> listener) { lock.lock(); if (isSignaled) { lock.unlock(); // 不要忘记放锁 listener.accept(result); // 在临界区外触发listener return; }
listeners.add(listener); lock.unlock(); }
public void signalAll(T result) { lock.lock(); if (isSignaled) { lock.unlock(); // 不要忘记放锁 return; }
this.result = result; isSignaled = true;
// this.listeners的副本 List<Consumer<T>> listeners = new ArrayList<>(this.listeners); this.listeners.clear(); lock.unlock();
for (Consumer<T> listener : listeners) { listener.accept(result); // 在临界区外触发listener }
/* 操作Condition时须上锁*/ lock.lock(); resultCondition.signalAll(); lock.unlock(); }
public T await() { lock.lock(); if (isSignaled) { lock.unlock(); // 不要忘记放锁 return result; }
while (!isSignaled) { resultCondition.awaitUninterruptibly(); } lock.unlock();
return result; }}
上述实现仅做演示使用,仍有较大的改进空间。生产环境的实现原理,读者可以参考 jdk CompletableFutre、netty DefaultPromise 。可以改进的地方包括:
3.2.3 须避免的特性
前面的小节展示了 Promise 的特性与实现原理。纯正的 Promise 是异步传递响应数据的工具,其应当只实现必要的数据传递特性,而不应当夹杂请求提交、数据处理等逻辑。接下来我们来看一看,Promise 在实现时应避免哪些特性,以防限制调用者所能做出的决策。
以连续提交数据库请求为例。如图 3-3a 所示,调用者调用了一个异步 API,连续提交 3 次写入请求,并在所返回的 Promise 上注册回调。
我们考察 writeAsync() 与 await() 如发生阻塞,将会对调用者造成什么影响,如图 3-3b 所示。提交请求是纯内存操作,线程处于 RUNNABLE 状态;writeAsync() 或 await() 如果发生阻塞,则线程处于 BLOCKED 状态,暂停工作而无法执行后续操作。当发生阻塞时,调用者每提交一个请求就不得不等待一段时间,从而降低了提交请求的频率,进而推迟了服务器对这些请求的响应,使得系统的吞吐量降低、延迟上升。特别地,如果系统采用了多路复用机制,即一个线程可以处理多个网络连接或多个请求,那么线程阻塞将会严重拖慢后续请求的处理,造成比较难排查的故障。
常见的阻塞原因包括:
图 3-3a 连续提交请求
图 3-3b 请求处理时间线
综上,如果绑定了线程池,Promise 就实现了对其他模型(如响应式模型)的兼容性。
图 3-4 线程时间线:线程池 vs 响应式
以数据库访问为例,现代数据库一般支持批量读写,以略微提升单次访问的延迟为代价,换来吞吐量显著提升;如果吞吐量得到提升,那么平均延迟反而会下降。下面的代码片段展示了一个批量请求 API :数据对象 BulkRequest 可以携带多条普通请求 Request ,从而实现批量提交。
/* 提交单条请求*/client.submit(new Request(1));client.submit(new Request(2));client.submit(new Request(3));
/* 提交批量请求*/client.submit(new BulkRequest( new Request(1), new Request(2), new Request(3)));
为了充分利用“批量请求”的特性,调用者需要进行跨越多条请求的“宏观调控”。请求产生后可以先缓存起来;等待一段时间后,取出所缓存的多条请求,组装一个批量请求来一并提交。因此,如下面的代码片段所示,在构造 Promise 时指定如何提交单条请求是没有意义的,这部分代码(client.submit(new Request(...)))并不会被执行;而实际希望执行的代码,其实是提交批量请求(client.submit(new BulkRequest(...)))。
/* Promise:提交单条请求*/new Promise<>(() -> client.submit(new Request(1)));new Promise<>(() -> client.submit(new Request(2)));new Promise<>(() -> client.submit(new Request(3)));
/* 定义如何处理响应数据*/Promise<String> promise = new Promise<>(result -> process(result));
/* 其他代码也关心响应数据*/promise.await(result -> process1(result));promise.await(result -> process2(result));
综上,Promise 应该是一个纯粹的数据对象,其职责是存储回调函数、存储响应数据;同时做好时序控制,保证触发回调函数无遗漏、保证触发顺序。除此以外,Promise 不应该和任何实现策略相耦合,不应该杂糅提交请求、处理响应的逻辑。
4总结
本文讲解了异步非阻塞设计模式,并对同步 API、异步 listener API、异步 Promise API 进行了对比。相比于其他两种 API,Promise API 具有无可比拟的灵活性,调用者可以自由决定同步返回还是异步返回,并允许对响应数据注册多个回调函数。最后,本文讲解了 Promise 基本功能的实现,并初步实现了线程安全特性。
本系列共 2 篇文章,本文为第 1 篇《原理篇》。在下一篇《应用篇》中,我们将看到 Promise 设计模式丰富的应用场景,将其和现有工具进行结合或对比,以及对 Promise API 进行进一步变形和封装,提供异常处理、调度策略等特性。
如果小伙伴们想要进一步交流,可以添加有道技术团队助手(ydtech01)与我们联络。
5参考文献
[A] 异步非阻塞 IO
https://en.wikipedia.org/wiki/Asynchronous_I/O
[B] Promise
https://en.wikipedia.org/wiki/Futures_and_promises
[C] java 线程状态
https://segmentfault.com/a/1190000038392244
[D] http 异步 API 样例:apache HttpAsyncClient
https://hc.apache.org/httpcomponents-asyncclient-4.1.x/quickstart.html
[E] redis 异步 API 样例:lettuce
https://github.com/lettuce-io/lettuce-core/wiki/Asynchronous-API
[F] mongo DB 异步 API 样例:AsyncMongoClient
https://mongodb.github.io/mongo-java-driver/3.0/driver-async/getting-started/quick-tour/
[G] elasticsearch 异步 API 样例:RestHighLevelClient
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-index.html
[H] influx DB 异步 API 样例:influxdb-java
https://github.com/influxdata/influxdb-java/blob/master/MANUAL.md
[I] jedis vs lettuce
https://redislabs.com/blog/jedis-vs-lettuce-an-exploration/
[J] kafka
http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html
[K] KafkaProducer.send() 阻塞
https://stackoverflow.com/questions/57140680/kafka-asynchronous-send-not-really-asynchronous
[L] netty
https://netty.io/wiki/user-guide-for-4.x.html
本周好文推荐
七部委进驻滴滴;活久见!腾讯阿里讲和;京东宣布全员涨薪两个月;IPIP.net状告阿里云 | Q资讯
InfoQ 写作平台欢迎所有热爱技术、热爱创作、热爱分享的内容创作者入驻!
还有更多超值活动等你来!
扫描下方二维码
填写申请,成为作者
开启你的创作之路吧~
点个在看少个 bug 👇