之前的文章中学习了J.U.C中aqs的底层实现原理,这篇文学习一下J.U.C中提供的一些线程同步工具类。
在前面学习 synchronized 的时候,有讲到 wait/notify 的基本使用,结合 synchronized 可以实现对线程的通信。既然 J.U.C 里面提供了锁的实现机制,那 J.U.C 里面有没有提供类似的线程通信的工具呢?
Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。
ConditionWait
public class ConditionDemoWait implements
Runnable {
private Lock lock;
private Condition condition;
public ConditionDemoWait(Lock lock,
Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemoWait");
try {
lock.lock();
condition.await();
System.out.println("end - ConditionDemoWait");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}ConditionSignal
public class ConditionDemoSignal implements
Runnable {
private Lock lock;
private Condition condition;
public ConditionDemoSignal(Lock lock,
Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin - ConditionDemoSignal");
try {
lock.lock();
condition.signal();
System.out.println("end - ConditionDemoSignal");
} finally {
lock.unlock();
}
}
}
通过这个案例简单实现了 wait 和 notify 的功能,当调用await 方法后,当前线程会释放锁并等待,而其他线程调用condition 对象的 signal 或者 signalall 方法通知并被阻塞的线程,然后自己执行 unlock 释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁。所以,condition 中两个最重要的方法,一个是 await,一个是 signal 方法
await:把当前线程阻塞挂起
signal:唤醒阻塞的线程
countdownlatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完毕再执行。从命名可以解读到 countdown 是倒数的意思,类似于我们倒计时的概念。
countdownlatch 提供了两个方法,一个是 countDown,一个是 await,countdownlatch 初始化的时候需要传入一个整数,在这个整数倒数到 0 之前,调用了 await 方法的程序都必须要等待,然后通过 countDown 来倒数。
public static void main(String[] args) throws
InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
System.out.println("" + Thread.currentThread().g
etName() + "-执行中");
countDownLatch.countDown();
System.out.println("" + Thread.currentThread().g
etName() + "-执行完毕");
}, "t1").start();
new Thread(() -> {
System.out.println("" + Thread.currentThread().g
etName() + "-执行中");
countDownLatch.countDown();
System.out.println("" + Thread.currentThread().g
etName() + "-执行完毕");
}, "t2").start();
new Thread(() -> {
System.out.println("" + Thread.currentThread().g
etName() + "-执行中");
countDownLatch.countDown();
System.out.println("" + Thread.currentThread().g
etName() + "-执行完毕");
}, "t3").start();
countDownLatch.await();
System.out.println("所有线程执行完毕");
}
从代码的实现来看,有点类似 join 的功能,但是比 join 更加灵活。CountDownLatch 构造函数会接收一个 int 类型的参数作为计数器的初始值,当调用 CountDownLatch 的countDown 方法时,这个计数器就会减一。通过 await 方法去阻塞去阻塞主流程。
semaphore 也就是我们常说的信号灯,semaphore 可以控制同时访问的线程个数,通过 acquire 获取一个许可,如果没有就等待,通过 release 释放一个许可。有点类似限流的作用。叫信号灯的原因也和他的用处有关,比如某商场就 5 个停车位,每个停车位只能停一辆车,如果这个时候来了 10 辆车,必须要等前面有空的车位才能进入。
public class Test {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 10; i++) {
new Car(i, semaphore).start();
}
}
static class Car extends Thread {
private int num;
private Semaphore semaphore;
public Car(int num, Semaphore
semaphore) {
this.num = num;
this.semaphore = semaphore;
}
public void run() {
try {
semaphore.acquire();//获取一个许可
System.out.println("第" + num + "占用一个停车位");
TimeUnit.SECONDS.sleep(2);
System.out.println("第" + num + "俩车走喽");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Semaphore 比较常见的就是用来做限流操作了。
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续工作。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 当前线程已经到达了屏障,然后当前线程被阻塞。
当存在需要所有的子任务都完成时,才执行主任务,这个时候就可以选择使用 CyclicBarrier
public class DataImportThread extends Thread {
private CyclicBarrier cyclicBarrier;
private String path;
public DataImportThread(CyclicBarrier cyclicBarrier, String path) {
this.cyclicBarrier = cyclicBarrier;
this.path = path;
}
@Override
public void run() {
System.out.println("开始导入:"+path+" 位置的数据");
try {
cyclicBarrier.await();//阻塞
} catch
(InterruptedException e) {
e.printStackTrace();
} catch
(BrokenBarrierException e) {
e.printStackTrace();
}
}
}
1)对于指定计数值 parties,若由于某种原因,没有足够的线程调用 CyclicBarrier 的 await,则所有调用 await 的线程都会被阻塞;
2)同样的 CyclicBarrier 也可以调用 await(timeout, unit),设置超时时间,在设定时间内,如果没有足够线程到达,则解除阻塞状态,继续工作;
3)通过 reset 重置计数,会使得进入 await 的线程出现BrokenBarrierException;
4)如果采用是 CyclicBarrier(int parties, Runnable barrierAction) 构造方法,执行 barrierAction 操作的是最后一个到达的线程