前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JUC并发编程之CountDownLatch源码详解

JUC并发编程之CountDownLatch源码详解

作者头像
黎明大大
发布2021-07-23 14:21:38
3040
发布2021-07-23 14:21:38
举报
文章被收录于专栏:java相关资料java相关资料

1

前言

关于JUC包下的工具类,到目前为止已经分享了ReentranLock、Semaphore这两个工具类,同样很多前置内容在前面两遍博文中也都要讲到,那么今天所分享的是CountDownLatch工具类、通过前面博文我们知道ReentranLock是独占锁模式、Semaphore是共享锁模式、那么CountDownLatch是什么模式呢?CountDownLatch它是闭锁模式。

什么是闭锁?

闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行

例如:

1.确保某个服务在其依赖的其他服务都启动之后才启动

2.等待某个操作的所有参与者都准备就绪才继续执行

2

什么是CountDownLatch

CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

如上这段话可能不是那么好理解,我举一个生活中很通俗的例子:

一家人要外出自驾游,旅游的家庭成员总共有5位,只有当5位家庭成员全部上车了,爸爸才会开始开车出发对吗?假如爸爸现在通知家庭成员要出发了,其他的3位成员陆陆续续的上车,此时车上已有4位成员了,

还有一位小妹妹在上厕所,这个时候爸爸以及其他3位成员需要等待妹妹上车后,才会开车出发旅游。

3

CountDownLatch的使用场景

在我们平常开发过程中,需要对某些接口进行高平发测试,一般我们会想到通过jmeter性能工具进行压测,但是我们有没有java工具类去帮我实现这种并发测试的API工具呢?当然是有的,就是CountDownLacth工具类,我们可以在代码中模拟高并发测试接口的场景

以及上述所说的自驾游场景,当然大家也可以自己发挥想象,项目中是否有场景能够用到该工具类。

4

CountDownLatch源码详解

如何使用CountDownLatch

聊完CountDownLatch的使用场景后,我们来看看基于上面的场景通过CountDownLatch来实现相应的功能

/**
 * @author sunny
 * @date 2021/07/06 09:30
 * @description
 */
@Slf4j
public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        test01();
//        test02();
//        test03();
    }
    /**
     * 模拟高并发场景
     *
     * @throws InterruptedException
     */
    public static void test01() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    log.info("{}:线程已就绪,当前时间戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());
                    countDownLatch.await();
                    log.info("{}:线程已释放,,当前时间戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        TimeUnit.SECONDS.sleep(3);
        System.out.println("\n ========================= \n");
        countDownLatch.countDown();
    }
    /**
     * 模拟家庭旅游场景
     */
    public static void test02() {
        CountDownLatch countDownLatch = new CountDownLatch(11);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                log.info("{}:已经上车了", Thread.currentThread().getName());
                countDownLatch.countDown();
            }).start();
        }
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                log.info("{}:五秒后已经上车了", Thread.currentThread().getName());
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        try {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("\n ========================= \n");
            countDownLatch.await();
            log.info("开车旅游啦");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    /**
     * 模拟去峨眉山游玩,但是因为中途有道路需要修路,所以需要等待修完后才能出发
     * 那么修路的过程中
     */
    public static void test03() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 1; i <= 9; i++) {
            new Thread(() -> {
                try {
                    log.info("{}:线程,我要准备前往峨眉山,路被堵住了", Thread.currentThread().getName());
                    countDownLatch.await();
                    log.info("{}:线程,道路终于可以通行了", Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        TimeUnit.SECONDS.sleep(3);
        log.info("======================准备开始动工======================");
        for (int i = 1; i <= 9; i++) {
            Thread.sleep(300);
            int fi = i;
            new Thread(() -> {
                countDownLatch.countDown();
                log.info("{}:线程,已开工:{}天,剩余:{}天", Thread.currentThread().getName(), fi, countDownLatch.getCount());
            }).start();
        }
        TimeUnit.SECONDS.sleep(6);
        new Thread(() -> {
            log.info("{}:线程,已开工", Thread.currentThread().getName());
            countDownLatch.countDown();
        }).start();
        TimeUnit.SECONDS.sleep(8);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    countDownLatch.await();
                    log.info("{}:线程,去峨眉山游玩啦", Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

源码分析

CountDownLatch有一个构造方法,传入的参数给state进行初始化,在CountDownLatch中state,我们就可以理解闭锁值,例如上面所说的家庭自驾游案例,当家庭成员全部到位,爸爸才会开车出发旅游对么?

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);  // 更新 state 值
}

首先从 "countDownLatch.await()" 作为源码入口

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

在该方法内有两个if判断,首先判断当前线程是否被中断,如果被中断了则直接抛出异常,而tryAcquireShared()方法则是通用模板方法,不同的子类根据自己的特性实现具体的逻辑

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

具体的加锁逻辑由子类自身的特性去具体实现的,在CountDownLatch中,它的加锁钩子方法如下所示,如果不进行重写该方法,则强制抛出异常。

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

接着我们走到CountDownLatch所实现逻辑代码块,该方法很简单,就是一个三元表达式,如果当前线程获取的state为0则代表无需进行等待,否则需要进行入队等待。就如刚刚前面所讲的自驾游场景案例,只有当所有成员全部上车了,才会开车出发,到这里还是很好理解的对吗?哈哈哈,我们接着往下看。

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

然后我们回退到上一步,看到acquireSharedInterruptibly()方法,如果state大于0则返回-1,从而会进入到doAcquireSharedInterruptibly()方法,这个方法与Semaphore()逻辑几乎一样,只是我们需要理解概念所一样而已。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

然后我们进入到doAcquireSharedInterruptibly()方法,主要的逻辑都在自旋里面,但是外面同样也有个比较重要的方法,就是addWaiter()方法,该方法传入的参数值为 "Node.SHARED" ,而SHARED的值就是new Node() 也就是创建了一个空的节点,然后我们来看看addWaiter()方法其内部逻辑做了些什么事情?

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);  // 构建双向链表 或 入队操作
    boolean failed = true;
    try {
        for (;;) { // 自旋
            final Node p = node.predecessor();  //获取当前节点的前驱节点
            if (p == head) {
                int r = tryAcquireShared(arg);  // 尝试获取令牌
                if (r >= 0) {  // 获取令牌成功
                    setHeadAndPropagate(node, r);  //传播链表
                    p.next = null; // help GC    将前驱节点的引用指向为NULL,待垃圾回收器回收
                    failed = false;
                    return;  // 获取令牌成功,退出自旋
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())   // 阻塞当前线程
                throw new InterruptedException();
        }
    } finally {
        // 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态
        if (failed)
            cancelAcquire(node);
    }
}

使文字更好理解代码这里先做前缀说明,node = 当前节点,tail = 链表末尾节点,head = 链表头节点

首先将当前线程封装为node节点,接着获取tail节点,判断当前AQS中是否存在双向链表,如果存在的话,将node前驱节点引用指向tail节点,通过cas将node节点设置为末尾节点,如果设置成功则将tail节点的后驱引用指向node,那么node就顺理成章的成了双向链表的末尾节点了。关于这里我们其实需要思考一个问题,在多线程情况下同时通过cas去设置尾节点,此时只会有一个线程设置成功且返回出去,那接下来的线程该怎么办呢?且不急,带着这个疑问我们进入到enq方法

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);   // 封装节点
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;  // 获取末尾节点
    if (pred != null) {
        node.prev = pred;   // 当前节点的前驱引用指向为pred
        if (compareAndSetTail(pred, node)) {  // 将当前节点设置为链表末尾节点
            pred.next = node;  // 原末尾节点后驱引用指向为当前节点
            return node; 
        }
    }
    enq(node);
    return node;
}

基于FIFO入队流程图

通过如下图理解上面这段话,我相信应该是能够明白的

使文字更好理解代码这里先做前缀说明,node = 当前节点,tail = 链表末尾节点,head = 链表头节点

得勒,进来就是一层自旋,注意这里的精华就是自旋,以及上面所提到多线程通过cas设置尾节点失败的解决方案就在此方法。

进入自旋获取链表的末尾节点,如果获取tail为null则证明当前并没有构成双向链表,接着通过cas去设置head,然后将head指向tail,这样双向链表就完成了,如果获取tail不为null,将node前驱引用指向tail节点,然后tail的后驱节点引用指向node节点,然后返回出去。那如果设置失败了怎么办呢?回到上面的问题,问题不大,这方法不是自旋嘛,它会一直自旋到你设置成功为止,才退出自旋。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail; // 获取末尾节点
        if (t == null) { // Must initialize   // 构建双向链表
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

如果通过cas设置不成功,就一直进行自旋,直到设置成功才退出循环。

接着,回退到doAcquireSharedInterruptibly()方法,通过上面的流程下来,我们就知道node节点现在已经成功入队到双向链表中,接着判断如果当前节点的前驱节点是为头节点此时会尝试获取令牌,如果获取失败则将线程进行阻塞,同理当前节点的前驱节点不是链表的头节点,也会将当前线程进行阻塞。无论如何只要令牌没有了,就得老老实实的在队列中进行呆着,直到下一次的唤醒。

那如果线程为头节点且获取令牌成功了,setHeadAndPropagate()方法又会做些什么事情呢?带着这个疑问,我们进去一探究竟

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);  // 构建双向链表 或 入队操作
    boolean failed = true;
    try {
        for (;;) { // 自旋
            final Node p = node.predecessor();  //获取当前节点的前驱节点
            if (p == head) {
                int r = tryAcquireShared(arg);  // 尝试获取令牌
                if (r >= 0) {  // 获取令牌成功
                    setHeadAndPropagate(node, r);  //传播链表
                    p.next = null; // help GC    将前驱节点的引用指向为NULL,待垃圾回收器回收
                    failed = false;
                    return;  // 获取令牌成功,退出自旋
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && //判断线程是否需要被阻塞
                parkAndCheckInterrupt())   // 阻塞当前线程
                throw new InterruptedException();
        }
    } finally {
        // 如果某个线程被中断,非正常流程退出则将当前线程的节点设置为cancel状态
        if (failed)
            cancelAcquire(node);
    }
}

首先我们看到该方法的入参内容,node:当前获取令牌线程节点,propagate: 值是根据获取state是否等于0判断,如果等0这么为1否则为-1

该方法主要作用在于两点,第一点:将当前节点设置为头节点,第二点:自动唤醒下一个节点

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||   // 还有令牌可获取 || 头节点状态处于等待状态
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;  // 获取当前下一节点
        if (s == null || s.isShared())  // 判断下节点是否为共享节点
            doReleaseShared();  // 传播~~ 具体传播什么呢???
    }
}

稍微可以看下设置头节点方法,也就是出队操作,主要就是将当前线程设置为头节点,然后将当前节点的前驱节点引用指向为null,配合方法外,会将之前的头节点的next节点设置为null,那么之前的头节点也就自然会被垃圾回收器进行

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

基于FIFO出队流程图

又一次来到自旋,首先验证链表中是否还存在多个节点,如果存在且状态为SIGNAL会将head的后驱节点进行唤醒。这里没啥太多好说的,就是一个传播概念,当你有多个节点在阻塞中,当state为0,是不是我的所有阻塞节点都需要被唤醒,然后执行后续的逻辑对么?

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {  // 自旋   可以理解为传播 【加自旋的原因,可能同时有多个令牌被释放,那么在这里就可以唤醒后续所有节点去获取令牌,就不用在前面再去判断是否要去唤醒后驱节点了。 如果没有获取到令牌也没关系,后面还是会将没有抢到的线程进行阻塞住】
        Node h = head;  
        if (h != null && h != tail) {  // 头节点不为null 其 头非等于尾节点 则证明当前链表还有多个节点
            int ws = h.waitStatus;   // 获取head的节点状态
            if (ws == Node.SIGNAL) {  // 如果当前节点状态为SIGNAL,就代表后驱节点正在被阻塞着
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  // 通过cas将状态从等待更换为非等待,然后取反的话,将下一个节点唤醒
                    continue;            // loop to recheck cases
                unparkSuccessor(h);  // 唤醒线程 去获取令牌
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  // 如果节点状态已经为0,则会将节点的状态更新为PROPAGATE   PROPAGATE:表示下一次共享式同步状态获取将会被无条件地传播下去
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;   // 跳出当前循环
    }
}

unparkSuccessor()方法很简单,在正常流程下它只会通过LockSupport.unpark(),将下一节点进行唤醒

private void unparkSuccessor(Node node) {
    // 先获取head节点的状态,应该是等于-1,原因在shouldParkAfterFailedAcquire方法中有体现
    int ws = node.waitStatus;
    
    // 由于-1会小于0,所以更新改为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 获取第一个正常排队的节点
    Node s = node.next;
    
    //正常解锁流程不会走该if判断
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    
    // 正常来说第一个排队的节点不应该为空,所以直接把第一个排队的线程唤醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

接下来看看countDown()方法到底也做了些什么流程操作

首先从 "countDownLatch.countDown()" 作为源码入口

sync.releaseShared(1);

我们来看到releaseShared方法,该方法内部有两个核心方法,我们先进入看看tryReleaseShared做了些什么事情

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {  //通用释放令牌
        doReleaseShared();  //唤醒后驱节点
        return true;
    }
    return false;
}

我们又看到了自旋,判断当前的state值是否等于0,等于0则代表需要提前的准备的线程都已就绪,主线程也可以执行剩下的业务逻辑啦,那如果不为0怎么办?一直自减,直到减到state为0,然后将链表内的线程全部进行唤醒。也就是会走到我上面所说到的doReleaseShared()方法

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

那么到这CountDownLatch源码分析到此结束了,相信大家伙如果看过我前面两篇文章,再看这篇博文会发现理解起来非常简单的。

JUC并发编程之CountDownLatch源码讲解视频

我是黎明大大,我知道我没有惊世的才华,也没有超于凡人的能力,但毕竟我还有一个不屈服,敢于选择向命运冲锋的灵魂,和一个就是伤痕累累也要义无反顾走下去的心。

如果您觉得本文对您有帮助,还请关注点赞一波,后期将不间断更新更多技术文章

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-07-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 黎明大大 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档