前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java - CountDownLatch学习和使用

Java - CountDownLatch学习和使用

作者头像
夹胡碰
发布2020-12-29 11:20:39
4710
发布2020-12-29 11:20:39
举报
文章被收录于专栏:程序猿~程序猿~

CountDownLatch常用于多线程开发中,某些线程需要等待一部分线程执行完毕之后再进行。最常见的场景就是一部调用多个接口,然后将返回值汇聚成一个汇总结果。下面将介绍CountDownLatch的使用及原理解析。

1. 使用演示

代码语言:javascript
复制
public class CountDownLatchTest {

    private static CountDownLatch countDownLatch = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread(new MThread()).start();
        new Thread(new MThread()).start();
        Thread.sleep(2000);
        countDownLatch.countDown();
        countDownLatch.countDown();
    }

    public static class MThread implements Runnable{

        @Override
        public void run() {
            try {
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + "run结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 输出
代码语言:javascript
复制
out => 
Thread-0run结束
Thread-1run结束

2. 底层逻辑展示

CountDownLatch底层依赖AQS实现。

  1. 初始化时通过构造方法将state赋值。
  2. 当有线程(可以是多个)执行await时,进入到等待队列中,并执行LockSupport.park挂起线程。
  3. 当有线程执行countDown时,进行CAS的state-1操作。
  4. 当线程执行countDown,执行完state-1操作发现state == 0时,执行LockSupport.unpark唤醒等待队列中的全部线程。

下面是流程图:

3. 重点源码说明

  • 初始化
代码语言:javascript
复制
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
代码语言:javascript
复制
Sync(int count) {
    setState(count); // 初始化state
}
  • 执行await
代码语言:javascript
复制
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
代码语言:javascript
复制
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
代码语言:javascript
复制
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1; // 如果state == 0 则直接放行,如果state != 0则阻塞线程
}
代码语言:javascript
复制
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED); // 创建共享锁节点
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 执行LockSupport.park阻塞线程
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
代码语言:javascript
复制
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);// 执行LockSupport.park阻塞线程
    return Thread.interrupted();
}
  • 执行coutDown
代码语言:javascript
复制
public void countDown() {
    sync.releaseShared(1);
}
代码语言:javascript
复制
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // 尝试释放共享锁
        doReleaseShared(); // 释放共享锁
        return true;
    }
    return false;
}
代码语言:javascript
复制
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc)) // CAS 执行state - 1
            return nextc == 0; // 如果state == 0,则表示可以释放共享锁
    }
}
代码语言:javascript
复制
private void doReleaseShared() {
    for (;;) { // 唤醒等待队列中的全部阻塞线程
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 使用演示
  • 2. 底层逻辑展示
  • 3. 重点源码说明
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档