Java 并发编程是整个 Java 开发体系中最难以理解但也是最重要的知识点,也是各类开源分布式框架(如 ZooKeeper、Kafka、Spring Cloud、Netty 等)中各个并发组件实现的基础。J.U.C 并发包,即 java.util.concurrent 包,大大提高了并发性能,是 JDK 的核心工具包,是 JDK 1.5 之后,由 Doug Lea 实现并引入。而 AQS 被认为是 J.U.C 的核心。
AQS 是一个抽象类,并没有对并发类提供了一个统一的接口定义,而是由子类根据自身的情况实现相应的方法,AQS 中一般包含两个方法 acquire(int)、release(int),获取同步状态和释放同步状态,AQS 根据其状态是否独占分为独占模式和共享模式。
同步器根据同步状态分为独占模式和共享模式,独占模式包括类:ReentrantLock、ReentrantReadWriteLock.WriteLock,共享模式包括:Semaphore、CountDownLatch、ReentrantReadWriteLock.ReadLock,本文将着重介绍一下 java.util.concurrent 包下一些辅助同步器类:CountDownLatch、CyclicBarrier、Semaphore、Exchanger、Phaser。
CountDownLatch 是一个同步辅助工具类,通过它可以完成类似于阻塞当前线程的功能,也就是一个或多个线程一直等待直到其他线程执行完成。即允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。
CountDownLatch 用了一个给定的计数器 cnt 来进行初始化,该计数器的操作是原子操作,即同时只能有一个线程操作该计数器,调用该类 await 方法的线程会一直处于阻塞状态,直到其他线程调用 countDown 方法时计数器的值变成 0,每次调用 countDown 时计数器的值会减 1,当计数器的值为 0 时所有因 await 方法而处于等待状态的线程就会继续执行。计数器 cnt 是闭锁需要等待的线程数量,只能被设置一次,且 CountDownLatch 没有提供任何机制去重新设置计数器 count,如果需要重置,可以考虑使用 CyclicBarrier。
CountdownLatch_example
(1)开启多个线程分块下载一个大文件,每个线程只下载固定的一截,最后由另外一个线程来拼接所有的分段。
(2)应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
(3)确保一个计算不会执行,直到所需要的资源被初始化。
(4)并行计算,处理量很大时可以将运算任务拆分成多个子任务,当所有子任务都完成之后,父任务再将所有子任务都结果进行汇总。
CountDownLatch 内部依赖 Sync 实现,而 Sync 继承 AQS。CountDownLatch 关键接口如下:
(1)作为一个开关 / 入口
将初始计数值为 1 的 CountDownLatch 作为一个的开关或入口,在调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。
public class Driver {
private static final Integer WORK_COUNT = 10;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) {
// 初始化计数器为 10 的 CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < WORK_COUNT; i++) {
executorService.execute(new Worker(countDownLatch));
}
// 主线程执行
doSomething();
// 主线程开启开关
countDownLatch.countDown();
// 平滑地关闭 ExecutorService
executorService.shutdown();
}
private static void doSomething() {
// ...
System.out.print("start..");
}
}
class Worker implements Runnable {
private final CountDownLatch countDownLatch;
Worker(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
// 所有执行线程在此处等待开关开启 [多个子线程同时执行]
countDownLatch.await();
// 子线程执行
doWork();
} catch (InterruptedException ignored) {
}
}
private void doWork() {
// ...
System.out.print("run..");
}
}
// Output
// start..run..run..run..run..run..run..run..run..run..run..
(2)作为一个完成信号
将初始计数值为 N 的 CountDownLatch 作为一个完成信号点,使某个线程在其它 N 个线程完成某项操作之前一直等待。
public class Driver {
private static final Integer WORK_COUNT = 10;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) throws InterruptedException {
// 初始化计数器为 10 的 CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(WORK_COUNT);
for (int i = 0; i < WORK_COUNT; i++) {
executorService.execute(new Worker(countDownLatch));
}
// 主线程等待其它 N 个线程完成
countDownLatch.await();
// 主线程执行
doSomething();
// 平滑地关闭 ExecutorService
executorService.shutdown();
}
private static void doSomething() {
// ...
System.out.println("end");
}
}
class Worker implements Runnable {
private final CountDownLatch countDownLatch;
Worker(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
// 子线程执行
doWork();
// 每个线程做完自己的事情后, 就将计数器减去 1
countDownLatch.countDown();
}
private void doWork() {
// ...
System.out.print("run..");
}
}
// Output
// run..run..run..run..run..run..run..run..run..run..end
0x02: CyclicBarrier - 循环栅栏
CyclicBarrier 和 CountDownLatch 是非常类似的,CyclicBarrier 核心的概念是在于设置一个等待线程的数量边界,到达了此边界之后进行执行。CyclicBarrier 也是一个同步辅助工具类,它允许一组线程相互等待直到到达某个公共的屏障点(Common Barrier Point),通过它可以完成多个线程之间相互等待时,只有当每个线程都准备就绪后才能各自继续执行后面的操作。
CyclicBarrier 也是通过计数器来实现,当某个线程调用 await 方法后就进入等待状态,计数器执行加一操作。当计数器的值达到了设置的初始值时等待状态的线程会被唤醒继续执行。通过调用 CyclicBarrier 对象的 await() 方法,两个线程可以实现互相等待。一旦 N 个线程在等待 CyclicBarrier 达成,所有线程将被释放掉去继续执行。由于 CyclicBarrier 在释放等待线程后可以重用,所以可以称之为循环栅栏。
CyclicBarrier 特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。
CyclicBarrier 并没有自己去实现 AQS 框架的 API,而是利用了 ReentrantLock 和 Condition。
CyclicBarrier 提供的关键方法如下:
CyclicBarrier 提供的两个构造函数:
(1)简单例子
public class Solver {
private static final Integer WORK_COUNT = 10;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) {
// 初始化计数器为 10 的 CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(WORK_COUNT);
for (int i = 0; i < WORK_COUNT; i++) {
executorService.execute(new Worker(cyclicBarrier));
}
// 平滑地关闭 ExecutorService
executorService.shutdown();
}
}
class Worker implements Runnable {
private final CyclicBarrier cyclicBarrier;
Worker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.print("before..");
try {
// 多个线程之间相互等待时,只有当每个线程都准备就绪后才能各自继续执行后面的操作
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
// 子线程执行
doWork();
}
private void doWork() {
// ...
System.out.print("after..");
}
}
// Output
// before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
(2)执行 barrierAction
在 ready 状态时日志是每秒输出一条,当有 5 条 ready 时会一次性输出 5 条 continue。这就是前面讲的全部线程准备就绪后同时开始执行。在初始化 CyclicBarrier 时还可以在等待线程数后指定一个 runnable,含义是当线程到达这个屏障时优先执行这里的 runnable。
public class Solver {
private static final Integer WORK_COUNT = 10;
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) throws InterruptedException {
// 初始化计数器为 5 的 CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println(String.format("%s call back is ready.", Thread.currentThread().getName())));
for (int i = 0; i < WORK_COUNT; i++) {
Thread.sleep(1000);
executorService.execute(new Worker(cyclicBarrier));
}
// 平滑地关闭 ExecutorService
executorService.shutdown();
}
}
class Worker implements Runnable {
private final CyclicBarrier cyclicBarrier;
Worker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println(String.format("%s is ready", Thread.currentThread().getName()));
cyclicBarrier.await();
System.out.println(String.format("%s continue", Thread.currentThread().getName()));
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
// Output
// pool-1-thread-1 is ready
// pool-1-thread-2 is ready
// pool-1-thread-3 is ready
// pool-1-thread-4 is ready
// pool-1-thread-5 is ready
// pool-1-thread-5 call back is ready.
// pool-1-thread-5 continue
// pool-1-thread-3 continue
// pool-1-thread-4 continue
// pool-1-thread-1 continue
// pool-1-thread-2 continue
// pool-1-thread-5 is ready
// pool-1-thread-1 is ready
// pool-1-thread-4 is ready
// pool-1-thread-2 is ready
// pool-1-thread-3 is ready
// pool-1-thread-3 call back is ready.
// pool-1-thread-3 continue
// pool-1-thread-5 continue
// pool-1-thread-4 continue
// pool-1-thread-2 continue
// pool-1-thread-1 continue
CyclicBarrier 与 CountDownLatch 可能容易混淆,我们强调下其区别:
Semaphore,又名信号量,这个类的作用有点类似于 “许可证”。信号量 Semaphore 是一个控制访问多个共享资源的计数器,和 CountDownLatch 一样,其本质上是一个 “共享锁”。从源码角度来看,Semaphore 的实现方式和 CountDownLatch 非常相似,基于 AQS 做了一些定制。通过维持 AQS 的锁全局计数 state 字段来实现定量锁的加锁和解锁操作。Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
有时,我们因为一些原因需要控制同时访问共享资源的最大线程数量,比如出于系统性能的考虑需要限流,或者共享资源是稀缺资源,我们需要有一种办法能够协调各个线程,以保证合理的使用公共资源。当有线程想要访问共享资源时,需要先获取 (acquire) 的许可;如果许可不够了,线程需要一直等待,直到许可可用。当线程使用完共享资源后,可以归还 (release) 许可,以供其它需要的线程使用;然而,实际上并没有真实的许可证对象供线程使用,Semaphore 只是对可用的数量进行管理维护。
Semaphore 可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。
Semaphore 内部包含公平锁(FairSync)和非公平锁(NonfairSync),继承内部类 Sync,其中 Sync 继承 AQS,作为 Semaphore 的公平锁和非公平锁的基类。
CyclicBarrier 提供的关键方法如下:
Semaphore 提供了两个构造函数:
public class SemaphoreExample {
public static void main(String[] args) {
final int clientCount = 3;
final int totalRequestCount = 10;
Semaphore semaphore = new Semaphore(clientCount);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
executorService.execute(()->{
try {
semaphore.acquire();
System.out.print(semaphore.availablePermits() + " ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
}
executorService.shutdown();
}
}
// Output
// 2 1 2 2 2 2 2 1 2 2
Exchanger(交换器)是一个用于线程间协作的工具类,是 JDK 1.5 开始提供的一个用于两个工作线程之间交换数据的封装工具类。Exchanger 有点类似于 CyclicBarrier,我们知道 CyclicBarrier 是一个栅栏,到达栅栏的线程需要等待其它一定数量的线程到达后,才能通过栅栏,Exchanger 可以看成是一个双向栅栏。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。
可简单地将 Exchanger 对象理解为一个包含两个格子的容器,通过 exchanger 方法可以向两个格子中填充信息。当两个格子中的均被填充时,该对象会自动将两个格子的信息交换,然后返回给线程,从而实现两个线程的信息交换。这两个线程通过 exchange 方法交换数据,如果第一个线程先执行 exchange 方法,它会一直等待第二个线程也执行 exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
Exchanger 是最简单的也是最复杂的,简单在于 API 非常简单,就一个构造方法和两个 exchange() 方法,最复杂在于它的实现是最复杂的。
Exchanger 提供的关键方法如下:
可以看出,当一个线程到达 exchange 调用点时,如果其他线程此前已经调用了此方法,则其他线程会被调度唤醒并与之进行对象交换,然后各自返回;如果其他线程还没到达交换点,则当前线程会被挂起,直至其他线程到达才会完成交换并正常返回,或者当前线程被中断或超时返回。
public class ExchangerExample {
private static final Integer WORK_COUNT = 2;
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < WORK_COUNT; i++) {
executorService.execute(() -> {
String beforeObj = Thread.currentThread().getName();
try {
String afterObj = exchanger.exchange(Thread.currentThread().getName());
System.out.println(String.format("currentThread %s , before exchange %s , after exchange %s", Thread.currentThread().getName(), beforeObj, afterObj));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
// Output
// currentThread pool-1-thread-1 , before exchange pool-1-thread-1 , after exchange pool-1-thread-2
// currentThread pool-1-thread-2 , before exchange pool-1-thread-2 , after exchange pool-1-thread-1
CountDownLatch 和 CyclicBarrier 都是 JDK 1.5 引入的,而 Phaser 是 JDK 1.7 引入的。Phaser 的功能与 CountDownLatch 和 CyclicBarrier 有部分重叠,它几乎可以取代 CountDownLatch 和 CyclicBarrier, 其功能更灵活,更强大,支持动态调整需要控制的线程数。
CountDownLatch,闭锁,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待,它提供了 await()、countDown() 两个方法来进行操作;CyclicBarrier,循环栅栏,允许一组线程互相等待,直到到达某个公共屏障点,它提供的 await() 可以实现让所有参与者在临界点到来之前一直处于等待状态;Phaser,多阶段栅栏,它把多个线程协作执行的任务划分为多个阶段,编程时需要明确各个阶段的任务,每个阶段都可以有任意个参与者,线程都可以随时注册并参与到某个阶段,当到达的参与者数量满足栅栏设定的数量后,会进行阶段升级(advance)。
Phaser 顾名思义,与阶段相关。Phaser 比较适合这样一种场景,一种任务可以分为多个阶段,现希望多个线程去处理该批任务,对于每个阶段,多个线程可以并发进行,但是希望保证只有前面一个阶段的任务完成之后才能开始后面的任务。这种场景可以使用多个 CyclicBarrier 来实现,每个 CyclicBarrier 负责等待一个阶段的任务全部完成。但是使用 CyclicBarrier 的缺点在于,需要明确知道总共有多少个阶段,同时并行的任务数需要提前预定义好,且无法动态修改。而 Phaser 可同时解决这两个问题。
Phaser 主要接口如下:
Phaser 提供的关键方法如下:
(1)通过 Phaser 实现 CyclicBarrier 控制多个线程的执行时机的功能
通过 Phaser 控制多个线程的执行时机:有时候我们希望所有线程到达指定点后再同时开始执行,我们可以利用 CyclicBarrier 来实现,这里给出使用 Phaser 的版本。
public class PhaserExample {
public static void main(String[] args) {
final int totalRequestCount = 10;
Phaser phaser = new Phaser();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
phaser.register();
executorService.execute(() -> {
// 等待其它参与者线程到达 [arriveAndAwaitAdvance 方法是不响应中断的,也就是说即使当前线程被中断,arriveAndAwaitAdvance 方法也不会返回或抛出异常,而是继续等待。如果希望能够响应中断,可以参考 awaitAdvanceInterruptibly 方法]
int j = phaser.arriveAndAwaitAdvance();
// do something
System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
});
}
executorService.shutdown();
}
}
// Output
// currentThread:pool-1-thread-1, Executing the task, currentPhase:1
// currentThread:pool-1-thread-2, Executing the task, currentPhase:1
// currentThread:pool-1-thread-7, Executing the task, currentPhase:1
// currentThread:pool-1-thread-4, Executing the task, currentPhase:1
// currentThread:pool-1-thread-6, Executing the task, currentPhase:1
// currentThread:pool-1-thread-3, Executing the task, currentPhase:1
// currentThread:pool-1-thread-10, Executing the task, currentPhase:1
// currentThread:pool-1-thread-9, Executing the task, currentPhase:1
// currentThread:pool-1-thread-5, Executing the task, currentPhase:1
// currentThread:pool-1-thread-8, Executing the task, currentPhase:1
(2)通过 Phaser 实现 CyclicBarrier 执行 barrierAction
CyclicBarrier 支持 barrier action, Phaser 同样也支持。不同之处是 Phaser 的 barrier action 需要改写 onAdvance 方法来进行定制。
public class PhaserExample {
public static void main(String[] args) {
final int totalRequestCount = 10;
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(String.format("%s call back is ready.", Thread.currentThread().getName()));
return super.onAdvance(phase, registeredParties);
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
// 注册各个参与者线程
phaser.register();
executorService.execute(() -> {
// 等待其它参与者线程到达 [arriveAndAwaitAdvance 方法是不响应中断的,也就是说即使当前线程被中断,arriveAndAwaitAdvance 方法也不会返回或抛出异常,而是继续等待。如果希望能够响应中断,可以参考 awaitAdvanceInterruptibly 方法]
int j = phaser.arriveAndAwaitAdvance();
// do something
System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
});
}
executorService.shutdown();
}
}
// Output
// pool-1-thread-10 call back is ready.
// currentThread:pool-1-thread-10, Executing the task, currentPhase:1
// currentThread:pool-1-thread-9, Executing the task, currentPhase:1
// currentThread:pool-1-thread-7, Executing the task, currentPhase:1
// currentThread:pool-1-thread-8, Executing the task, currentPhase:1
// currentThread:pool-1-thread-5, Executing the task, currentPhase:1
// currentThread:pool-1-thread-3, Executing the task, currentPhase:1
// currentThread:pool-1-thread-1, Executing the task, currentPhase:1
// currentThread:pool-1-thread-6, Executing the task, currentPhase:1
// currentThread:pool-1-thread-2, Executing the task, currentPhase:1
// currentThread:pool-1-thread-4, Executing the task, currentPhase:1
(3)通过 Phaser 实现 CountDownLatch 作为一个开关 / 入口功能
public class PhaserExample {
public static void main(String[] args) throws IOException {
final int totalRequestCount = 10;
// 注册主线程, 当外部条件满足时, 由主线程打开开关
Phaser phaser = new Phaser(1);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
// 注册各个参与者线程
phaser.register();
executorService.execute(() -> {
// 等待其它参与者线程到达 [arriveAndAwaitAdvance 方法是不响应中断的,也就是说即使当前线程被中断,arriveAndAwaitAdvance 方法也不会返回或抛出异常,而是继续等待。如果希望能够响应中断,可以参考 awaitAdvanceInterruptibly 方法]
int j = phaser.arriveAndAwaitAdvance();
// do something
System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
});
}
// 打开开关 [parties 共 11 个, 主线程从之后需要等待的成员中移除, 即 parties 还剩 10]
phaser.arriveAndDeregister();
System.out.println("主线程打开了开关");
executorService.shutdown();
}
}
// Output
// 主线程打开了开关
// currentThread:pool-1-thread-6, Executing the task, currentPhase:1
// currentThread:pool-1-thread-7, Executing the task, currentPhase:1
// currentThread:pool-1-thread-1, Executing the task, currentPhase:1
// currentThread:pool-1-thread-8, Executing the task, currentPhase:1
// currentThread:pool-1-thread-3, Executing the task, currentPhase:1
// currentThread:pool-1-thread-9, Executing the task, currentPhase:1
// currentThread:pool-1-thread-4, Executing the task, currentPhase:1
// currentThread:pool-1-thread-2, Executing the task, currentPhase:1
// currentThread:pool-1-thread-5, Executing the task, currentPhase:1
(4)通过 Phaser 实现分层
public class PhaserExample {
public static void main(String[] args) {
final int parties = 3;
final int phases = 4;
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("====== Phase :" + phase + "======");
return super.onAdvance(phase, registeredParties);
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < parties; i++) {
// 注册各个参与者线程
phaser.register();
executorService.execute(() -> {
for (int phase = 0; phase < phases; phase++) {
// 等待其它参与者线程到达 [arriveAndAwaitAdvance 方法是不响应中断的,也就是说即使当前线程被中断,arriveAndAwaitAdvance 方法也不会返回或抛出异常,而是继续等待。如果希望能够响应中断,可以参考 awaitAdvanceInterruptibly 方法]
int j = phaser.arriveAndAwaitAdvance();
// do something
System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
}
});
}
executorService.shutdown();
}
}
// Output
// ====== Phase : 0 ======
// currentThread:pool-1-thread-1, Executing the task, currentPhase:1
// currentThread:pool-1-thread-2, Executing the task, currentPhase:1
// currentThread:pool-1-thread-3, Executing the task, currentPhase:1
// ====== Phase : 1 ======
// currentThread:pool-1-thread-3, Executing the task, currentPhase:2
// currentThread:pool-1-thread-1, Executing the task, currentPhase:2
// currentThread:pool-1-thread-2, Executing the task, currentPhase:2
// ====== Phase : 2 ======
// currentThread:pool-1-thread-2, Executing the task, currentPhase:3
// currentThread:pool-1-thread-1, Executing the task, currentPhase:3
// currentThread:pool-1-thread-3, Executing the task, currentPhase:3
// ====== Phase : 3 ======
// currentThread:pool-1-thread-3, Executing the task, currentPhase:4
// currentThread:pool-1-thread-1, Executing the task, currentPhase:4
// currentThread:pool-1-thread-2, Executing the task, currentPhase:4
[1]. 【并发编程】J.U.C 之 AQS 介绍、实现及其子类使用演示 [2]. Java 进阶(四)线程间通信剖析 [3]. 透彻理解 Java 并发编程 [4]. 死磕 Java 并发
source:https://morning-pro.github.io/archives/f4f43ede.html
喜欢,在看