在编程的竞技场中,多线程与并发是每一级大牛必备的技能!今天,就让我们深入了解Java并发武器库中的“五神兵”——AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger的强大之处。他们如棋盘上的棋子,既能彼此协调,又能独当一面,解决了无数线程之问的冲突、同步与协作难题。
::: tip 走过路过,不要错过!
点赞、评论、转发是对笔者最大的支持!是不是已经迫不及待想要揭开这五神兵的神秘面纱?阅读完本文后,学会即点即用,咱也能成为一名真正的高级并发架构师!🚀
:::
AQS是Java并发库的基础,很多Java并发类都以其为基石来构建更为复杂的同步机制。它提供了一种框架,使得线程同步更加高效。
AQS通过一个内部队列及一个state变量来管理同步状态,确保只有一个线程获取到锁。
ReentrantLock
Semaphore
CountDownLatch
CyclicBarrier
ReentrantReadWriteLock
以下是一个自定义的同步组件示例,实现了独占和共享锁功能:
class CustomMutex {
// 使用AQS实现互斥锁功能
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {}
private static final class NonfairSync extends Sync {
NonfairSync() {
setState(0); // 初始状态为0,可使用
}
// 尝试获取锁
@Override
protected boolean tryAcquire(int acquires) {
return compareAndSetState(0, 1);
}
// 释放锁
@Override
protected boolean tryRelease(int releases) {
setState(0);
return true;
}
}
public CustomMutex() {
sync = new NonfairSync();
}
// 加锁
public void lock() {
sync.acquire(1);
}
// 解锁
public void unlock() {
sync.release(1);
}
}
CountDownLatch
是AQS的实现之一,主要用于某一线程等待其他线程完成操作后才能继续执行,它允许一个或多个线程等待直到在其他线程中发生某些操作。
常用于主线程必须等待其他所有线程完成某项工作后才能继续执行的场景。
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threads = 5;
CountDownLatch latch = new CountDownLatch(threads);
for (int i = 0; i < threads; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " is doing some task");
// 模拟工作时间
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println(Thread.currentThread().getName() + " has finished its task");
// 减少计数器
latch.countDown();
}).start();
}
// 主线程等待其他线程完成
latch.await();
System.out.println("All workers have finished their tasks.");
}
}
CyclicBarrier用于线程同步,允许多个线程互相等待到达屏障点。
当指定数目的线程都调用了await
方法后,这些线程才会继续执行。
如在进行大规模并行处理时,可以使用CyclicBarrier让多个线程在某个点集结,共同执行任务。
public class CyclicBarrierExample {
public static void main(String[] args) {
int numThreads = 5;
CyclicBarrier barrier = new CyclicBarrier(numThreads);
for (int i = 0; i < numThreads; i++) {
final int threadNum = i;
new Thread(() -> {
System.out.println("Thread " + threadNum + " reaching barrier.");
try {
barrier.await(); // 等待所有线程到达屏障点
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
Semaphore
可以控制访问特定资源的线程数量。它通过设置许可数控制并发数量。
适用于控制同时访问某个特定资源的线程数量,如数据库连接池,或限流功能。
public class SemaphoreExample {
public static void main(String[] args) {
final int permits = 3; // 信号量的初始许可数量
Semaphore semaphore = new Semaphore(permits);
for (int i = 0; i < 5; i++) {
new Thread(new Worker(i, semaphore)).start();
}
}
static class Worker implements Runnable {
private final int workerId;
private final Semaphore semaphore;
Worker(int id, Semaphore semaphore) {
this.workerId = id;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Worker " + workerId + " got a permit.");
// 模拟工作耗时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println("Worker " + workerId + " released the permit.");
}
}
}
}
Exchanger
是一个用于两个线程之间交换数据的工具,通常用于生产者消费者模式。
适合于需要两个线程在不同的点互相交换数据的场景,比如在网格计算、数据分发等情形。
public class ExchangerExample {
static Exchanger<Integer> exchanger = new Exchanger<>();
public static void main(String[] args) throws InterruptedException {
Thread consumer = new Thread(() -> {
try {
System.out.println("Consumer received: " + exchanger.exchange(1));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
consumer.start();
System.out.println("Producer sends: " + exchanger.exchange(2));
}
}
AQS 是 Java 并发包中一个非常重要的抽象类,用于构建锁和同步器。多数同步类(如 ReentrantLock、CountDownLatch 等)都是基于 AQS 实现的。
AQS 通过一个 FIFO 队列来实现资源的争夺。它使用一个 state
变量来表示同步状态,通过 CAS
操作进行原子更新。
acquire(int arg)
和 release(int arg)
方法来申请和释放资源。AQS 主要用于构建锁和其他同步器,比如:
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class SimpleLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
}
CountDownLatch 是一个同步工具类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
CountDownLatch 通过 AQS 的 state
来计数,调用 countDown()
方法会将 state
减 1,当 state
减到 0 时,所有等待线程会被唤醒。
CountDownLatch 适用于:
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
final int threadCount = 3;
final CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " is running");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}).start();
}
latch.await();
System.out.println("All threads have finished.");
}
}
CyclicBarrier 是一个同步工具类,它允许一组线程互相等待,直到所有线程都到达某个屏障点。
CyclicBarrier 使用一个计数器来跟踪到达屏障的线程数量。当计数器达到预设值时,所有等待的线程会被唤醒,并且计数器被重置。
CyclicBarrier 适用于:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
private static final int THREAD_COUNT = 3;
private static final CyclicBarrier BARRIER = new CyclicBarrier(THREAD_COUNT, () -> {
System.out.println("All threads have reached the barrier. Let's proceed.");
});
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(new Task()).start();
}
}
static class Task implements Runnable {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " is performing some work.");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " has reached the barrier.");
BARRIER.await();
System.out.println(Thread.currentThread().getName() + " is proceeding with further work.");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
Semaphore 是一个计数信号量,用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。
Semaphore 通过内部的计数器来控制资源的访问数量。调用 acquire()
方法会尝试获取一个许可,计数器减 1;调用 release()
方法会释放一个许可,计数器加 1。
Semaphore 适用于:
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
private static final int PERMITS = 3;
private static final Semaphore SEMAPHORE = new Semaphore(PERMITS);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(new Task()).start();
}
}
static class Task implements Runnable {
@Override
public void run() {
try {
SEMAPHORE.acquire();
System.out.println(Thread.currentThread().getName() + " acquired a permit.");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " released a permit.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
SEMAPHORE.release();
}
}
}
}
Exchanger 是一个用于线程间数据交换的同步点。两个线程可以在此交换数据,每个线程通过 exchange
方法交换彼此的数据。
Exchanger 通过一个内部的交换槽(exchange slot)来实现数据交换。当一个线程调用 exchange()
方法时,会等待另一个线程也调用 exchange()
方法,然后互相交换数据。
Exchanger 适用于:
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
private static final Exchanger<String> EXCHANGER = new Exchanger<>();
public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
static class Producer implements Runnable {
@Override
public void run() {
try {
String data = "Data from Producer";
System.out.println("Producer is exchanging data: " + data);
String receivedData = EXCHANGER.exchange(data);
System.out.println("Producer received data: " + receivedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
String data = "Data from Consumer";
System.out.println("Consumer is exchanging data: " + data);
String receivedData = EXCHANGER.exchange(data);
System.out.println("Consumer received data: " + receivedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
AQS 是 Java 并发包的基石,它为实现锁和同步器提供了一个框架。AQS 通过维护一个 FIFO 队列来管理线程的等待状态,并通过 state
变量控制同步状态。
CountDownLatch 是一个简单而强大的等待机制,适用于一组线程等待另一个线程完成工作。
CyclicBarrier 允许一组线程互相等待,直到所有线程都到达某个屏障点。它可以被重复使用。
BrokenBarrierException
等异常。Semaphore 控制同时访问某资源的线程数量,可以用于实现资源池等场景。
Exchanger 用于两个线程之间的数据交换。
在实际项目中,我们需要根据不同的需求选择合适的同步工具。下面总结了这些工具的适用场景:
工具 | 适用场景 |
---|---|
AQS | 构建自定义同步器、锁等复杂同步机制。 |
CountDownLatch | 一个或多个线程等待其他线程完成工作,适用于一次性任务。 |
CyclicBarrier | 一组线程需要在某个点上相互等待,适用于循环使用的场景。 |
Semaphore | 控制资源的并发访问数量,如连接池、限流等。 |
Exchanger | 两个线程之间的数据交换,如数据传递、缓冲区交换等。 |
在实际项目中,我们可能会遇到需要控制并发访问、协调线程操作的复杂场景。下面我们用一个模拟高并发访问资源的例子来演示如何综合运用这些同步工具。
假设我们有一个高并发访问的资源(如数据库连接),我们需要:
import java.util.concurrent.*;
public class HighConcurrencyDemo {
private static final int THREAD_COUNT = 10;
private static final int PERMITS = 3;
private static final CountDownLatch INIT_LATCH = new CountDownLatch(THREAD_COUNT);
private static final CyclicBarrier BARRIER = new CyclicBarrier(THREAD_COUNT);
private static final Semaphore SEMAPHORE = new Semaphore(PERMITS);
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
executor.submit(new Worker());
}
executor.shutdown();
}
static class Worker implements Runnable {
@Override
public void run() {
try {
// 模拟初始化工作
System.out.println(Thread.currentThread().getName() + " is initializing.");
Thread.sleep((long) (Math.random() * 1000));
INIT_LATCH.countDown();
INIT_LATCH.await();
// 等待其他线程到达屏障
System.out.println(Thread.currentThread().getName() + " is waiting at the barrier.");
BARRIER.await();
// 控制访问资源的并发数量
SEMAPHORE.acquire();
System.out.println(Thread.currentThread().getName() + " is accessing the resource.");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " has finished accessing the resource.");
SEMAPHORE.release();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
countDown()
方法。await()
方法等待所有初始化工作完成。await()
方法等待其他线程到达屏障点。Semaphore
来控制同时访问资源的线程数量。acquire()
方法获取许可,访问完资源后通过 release()
方法释放许可。运行上述代码,你会看到各个线程按顺序完成初始化、等待在屏障点、并发访问资源的过程。控制台输出将显示线程的执行步骤,便于理解线程间的协作和同步机制。
这篇文章详细介绍了 Java 并发编程中的五大核心工具类:AQS、CountDownLatch、CyclicBarrier、Semaphore 和 Exchanger。我们不仅讨论了它们的底层原理和应用场景,还通过代码示例展示了如何在实际项目中使用这些工具。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。