前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS源码分析之CountDownLatch

AQS源码分析之CountDownLatch

作者头像
山行AI
发布2020-03-25 09:25:42
3600
发布2020-03-25 09:25:42
举报
文章被收录于专栏:山行AI

这个类使一个线程等待其他线程各自执行完毕后再执行。是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

CountDownLatch内部类

java.util.concurrent.CountDownLatch.Sync

代码语言:javascript
复制
 private static final class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {            // 设置锁的数量            setState(count);        }
        int getCount() {            // 获取锁的数量            return getState();        }
         // 尝试获取锁        protected int tryAcquireShared(int acquires) {            return (getState() == 0) ? 1 : -1;        }
        protected boolean tryReleaseShared(int releases) {            // Decrement count; signal when transition to zero            // 每释放一个count减1            for (;;) {// 无限循环                // 用AQS状态值作为锁的count值                int c = getState();                // 锁数量为0时返回false                if (c == 0)                    return false;                // 不为0,减1                int nextc = c-1;                // 将最新值cas设置到state中去                if (compareAndSetState(c, nextc))                    return nextc == 0;            }        }    }

tryAcquireShared方法在调用时只有在state值为0时才会返回1,否则会一直返回-1。

CountDownLatch属性与构造方法

代码语言:javascript
复制
 private final Sync sync;
    public CountDownLatch(int count) {        if (count < 0) throw new IllegalArgumentException("count < 0");        this.sync = new Sync(count);    }

内部的主要工作是通过Sync来处理。

下面来看下该类的主要的两个方法,await和countDown方法。

java.util.concurrent.CountDownLatch#await()方法

代码语言:javascript
复制
 public void await() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    }

内部调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly方法:

代码语言:javascript
复制
 public final void acquireSharedInterruptibly(int arg)            throws InterruptedException {        if (Thread.interrupted())            throw new InterruptedException();        if (tryAcquireShared(arg) < 0)//获取不到许可时,调用doAcquireSharedInterruptibly方法            doAcquireSharedInterruptibly(arg);    }
private void doAcquireSharedInterruptibly(int arg)        throws InterruptedException {        // 向AQS队列添加一个SHARED状态的节点        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            for (;;) {// 无限循环                // 获取前置节点                final Node p = node.predecessor();                // 如果前置节点是头节点                if (p == head) {                    // 尝试获取共享许可                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        // 成功了则设置头节点并进行传播                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        failed = false;                        return;                    }                }                // 判断是否需要在失败时进行park(即等待)                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    throw new InterruptedException();            }        } finally {            if (failed)                // 取消获取许可                cancelAcquire(node);        }    }

这里有一点需要注意的是如果有new CountDownLatch(10),那么state的值就被设置为10,调用await方法的线程调用tryAcquireShared方法时会返回-1,然后进入shouldParkAfterFailedAcquire方法,线程最终会park,直到state被调用countDown方法的线程减少到0。

java.util.concurrent.CountDownLatch#countDown方法

代码语言:javascript
复制
 public void countDown() {        sync.releaseShared(1);    }

此时再回过头来看下releaseShared方法:

代码语言:javascript
复制
     public final boolean releaseShared(int arg) {        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }
    protected boolean tryReleaseShared(int releases) {            // Decrement count; signal when transition to zero            // 每释放一个count减1            for (;;) {// 无限循环                // 用AQS状态值作为锁的count值                int c = getState();                // 锁数量为0时返回false                if (c == 0)                    return false;                // 不为0,减1                int nextc = c-1;                // 将最新值cas设置到state中去                if (compareAndSetState(c, nextc))                    return nextc == 0;            }        }
    // 共享模式下的释放动作-表示唤醒后继节点并确保传播    private void doReleaseShared() {        for (;;) {// 无限循环            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                    // cas头节点的waitStatus                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))                        continue;            // loop to recheck cases                    // 唤醒后继节点                    unparkSuccessor(h);                }                else if (ws == 0 &&                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))                    continue;                // loop on failed CAS            }            if (h == head)                   // loop if head changed                break;        }    }
  • tryReleaseShared方法会减少state的值,直到为0,在此之前都是返回true;
  • doReleaseShared方法会尝试唤醒AQS中处于SIGNAL状态的头节点和后继节点。这里唤醒SIGNAL状态的头节点操作位于doAcquireSharedInterruptibly中的无限for循环(之前讲伪唤醒时有分析过这个)。

示例

worker线程:
代码语言:javascript
复制
public class Worker implements Runnable{  
    private CountDownLatch downLatch;      private String name;  
    public Worker(CountDownLatch downLatch, String name){          this.downLatch = downLatch;          this.name = name;      }  
    public void run() {          this.doWork();          try{              TimeUnit.SECONDS.sleep(new Random().nextInt(10));          }catch(InterruptedException ie){          }          System.out.println(this.name + "工作结束!");          this.downLatch.countDown();  
    }  
    private void doWork(){          System.out.println(this.name + "正在工作!");      }  
}  
wait线程,也就是boss线程:
代码语言:javascript
复制
public class Boss implements Runnable {  
    private CountDownLatch downLatch;  
    public Boss(CountDownLatch downLatch){          this.downLatch = downLatch;      }  
    public void run() {          System.out.println("老板正在等所有人工作完成......");          try {              this.downLatch.await();          } catch (InterruptedException e) {          }          System.out.println("所有人工作完成了,老板开始检查工作!");      }  
}  
main方法:
代码语言:javascript
复制
 public static void main(String[] args) {          ExecutorService executor = Executors.newCachedThreadPool();  
        CountDownLatch latch = new CountDownLatch(3);  
        Worker w1 = new Worker(latch,"zs");          Worker w2 = new Worker(latch,"ls");          Worker w3 = new Worker(latch,"ww");  
        Boss boss = new Boss(latch);  
        executor.execute(w3);          executor.execute(w2);          executor.execute(w1);          executor.execute(boss);  
        executor.shutdown();      }  

总共三个许可,三个worker,一个boss,每个worker工作完成之后打卡,boss一直等待直到所有worker工作完成。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-03-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CountDownLatch内部类
    • java.util.concurrent.CountDownLatch.Sync
      • CountDownLatch属性与构造方法
        • java.util.concurrent.CountDownLatch#await()方法
          • java.util.concurrent.CountDownLatch#countDown方法
          • 示例
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档