CountDownLatch是JUC包下的一个工具类,允许一个或多个线程等待,直到其他线程中执行的一个放行操作完后,等待线程才会继续往下执行的同步辅助。
可用于一个或多个线程中等待其他线程完成某项操作后再运行的场景。
首先看一下其用法: 1、先创建一个CountDownLatch的实例
参数是计数器(可以设置大于1的),也就是必须要设定的线程执行完后等待线程才会往下执行。
设置一个共享变量sharedNum,初始值为0
int sharedNum = 0;
CountDownLatch countDownLatch = new CountDownLatch(1);
2、创建多个线程,此处设置两个线程,分别是A和B,下面代码的意思是A线程必须等待B线程执行完后,A线程才会往下执行
new Thread(new Runnable() {
@Override
public void run() {
try {
1、调用await方法,使线程阻塞
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"获取sharedNum的值为"+sharedNum);
}
},"A").start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sharedNum = 2;
System.out.println(Thread.currentThread().getName()+"将sharedNum设置为2");
2、调用countDown方法,将技术器减1
countDownLatch.countDown();
}
},"B").start();
上述代码执行结果始终都是:
接下来看一下其源代码:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
底层使用Sync类,Sync是AQS的子类,AQS里面维护了锁的状态state字段和线程等待队列。AQS是并发包里面很重要的一个抽象类。
class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
}
void setState(int newState) {
state = newState;
}
上述可以看到,设定的计数值是直接设置锁状态的。
设置了锁状态之后,等待线程需要调用await方法线程才会进行等待。
await() throws InterruptedException {
1、以共享模式获取对象锁,如果线程中断则终止
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException
--线程中断抛出异常终止
if (Thread.interrupted())
throw new InterruptedException();
2、尝试获取锁,返回1可以获取
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
3、只有当锁状态为0的时候才可获取锁,即返回1
return (getState() == 0) ? 1 : -1;
}
而一开始我们初始化CountDownLatch的时候并不是为0,而是为1,所以调用await的线程并不会获取到锁。
此时会执行doAcquireSharedInterruptibly方法,当前线程进入AQS维护的线程等待队列中,死循环的去获取共享锁(可以同时被多个线程获取)。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException
4、将当前线程加入等待队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
5、重试获取锁,只有当锁状态为0的时候才可获取,
获取到之后r是1
int r = tryAcquireShared(arg);
if (r >= 0) {
6、获取之后将当前线程设置为线程头结点,
并去唤醒下一个节点上的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
7、获取失败后会检查上一个节点的状态,看是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
第6步获取锁之后将当前线程设置为头结点,并尝试去唤醒下一个线程获取共享锁
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
--唤醒下一个等待的线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
上述就是等待线程去获取锁的流程,当锁状态不为0的时候,调用await的线程会一直阻塞,直至锁状态为0,那么什么时候锁的状态会为0呢?
这就是countDown方法的用处了,没调用一次countDown方法,锁的状态都会进行减低1,直至锁状态为0
public void countDown() {
1、释放锁
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
2、尝试释放共享锁,也就是将锁状态减1
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
3、锁状态递减1,直至为0返回true,否则返回false
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
当锁状态递减为0的时候,调用await的等待线程就可以获取到锁,此时等待线程就可以被唤醒继续往下执行了。
上述就是CountDownLatch的用法以及源码解析。