前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Juc并发编程14——线程计数器CountdownLatch源码剖析

Juc并发编程14——线程计数器CountdownLatch源码剖析

作者头像
用户10127530
发布2022-10-26 18:07:14
2900
发布2022-10-26 18:07:14
举报
文章被收录于专栏:半旧的技术栈半旧的技术栈

前 言 🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端 🍌 专栏简介:juc并发编程,讲解锁原理、锁机制、线程池、AQS、并发容器、并发工具等,深入源码,持续更新。 🌰 文章简介:本文主要介绍常用的并发工具类:CountdownLatch,将深入剖析源码,讲解其使用与原理

线程计数器CountdownLatch源码剖析

文章目录

1 使用计数器锁实现任务计数

多任务同步神器,它允许一个或多个线程,等待其它线程完成工作,比如我们现在有一个需求:

  • 有20个任务,需要将每个任务的执行结果算出来,但是每个任务执行的时间未知。
  • 当所有的任务执行结束后,立即整合统计所有的执行结果。

我们并不知道任务可以在什么时间完成,因此执行统计的时间不好设置,设置短了则还有任务没有完成,设置长了则统计延迟。

CountdownLatch可以做到,它是一个实现子任务同步的工具。Demo如下。

代码语言:javascript
复制
  public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(20);
        for (int i = 0; i < 20; i++) {
            final int  finalI = i;
            new Thread(() ->{
                try {
                    Thread.sleep((long)(2000 + new Random().nextDouble()));
                    System.out.println("thread " + finalI + " finished");
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
                latch.countDown(); // 相当于计数器,每次减少1
            }).start();
        }

        latch.await(); //可以多个线程同时等待,这里仅演示了一个线程进行等待
        System.out.println("all sub task finished!");
    }

执行结果如下。

在这里插入图片描述
在这里插入图片描述

其实它就是一个线程计数器,注意CountDownLatch是一次性的,不能重复使用。比如下面再多调用一次latch.await,程序还是正常结束的(毕竟计数不可逆,已经是0了,而且无法将计数器重置).

代码语言:javascript
复制
 public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(20);
        for (int i = 0; i < 20; i++) {
            final int  finalI = i;
            new Thread(() ->{
                try {
                    Thread.sleep((long)(2000 + new Random().nextDouble()));
                    System.out.println("thread " + finalI + " finished");
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
                latch.countDown();
            }).start();
        }

        latch.await();
        
        latch.await();
        System.out.println("all sub task finished!");
    }

2 await的源码剖析

上面已经演示了使用,下面来看看它的原理吧。

代码语言:javascript
复制
public class CountDownLatch {
 	// 使用了AQS,不过是基于共享锁实现的 
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
			//这里实际上使用count作为了共享锁的state值
			// count数与共享锁的数量相同
			// 每调用一次countdown就是解一层锁
        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }
			// 重写了共享锁的实现
			// 获取共享锁其实就是等待其它线程把state减到0
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

			// 解锁的过程
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

  
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count); //构造sync方法
    }

	//通过acquireSharedInterruptibly获取共享锁,
	// 但是如果state不为0,将会被持续阻塞,后文详解
	public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
	
	// 同上,但是会被阻塞
  	public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
	//countDown其实就是解锁一次
  public void countDown() {
        sync.releaseShared(1);
    }

// 获取当前的计数,也就是AQS的state值
	public long getCount() {
        return sync.getCount();
    }
    
	public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }

我们终于知道CountDownLatch它的原理了,原来它就是利用共享锁来实现计数的,锁的数量就是计数数量,countdown的过程就是解锁的过程。

我们在该系列博客已经介绍过独占锁,但是没有深入剖析过共享锁,这里我们来深入共享锁的源码进行剖析,以便大家对CountDownLatch具有更为深入的理解。

点进AbstractQueuedSynchronizer类中。先看看acquireShared获取共享锁,这个就是CountDownLatchawait方法调用的底层方法(实际上是acquireSharedInterruptibly,不过原理是一样的)。

代码语言:javascript
复制
public final void acquireShared(int arg) {
			// 尝试获取共享锁,小于0则失败
        if (tryAcquireShared(arg) < 0)
        	// 获取共享锁失败,进入阻塞
            doAcquireShared(arg);
}

接下来看看doAcquireShared是如何进行阻塞的。

代码语言:javascript
复制
 private void doAcquireShared(int arg) {
 		//向等待队列中添加一个新的共享锁节点
        final Node node = addWaiter(Node.SHARED); 
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) { // 无限循环
               // 获取当前新建节点的前驱节点
                final Node p = node.predecessor(); 
                //如果前驱节点是头节点,说明当前节点是等待队列的队首节点
                if (p == head) {
                	//再次尝试获取共享锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) { //获取成功
                       //将当前节点设置为头节点,并且继续唤醒后继节点
                        setHeadAndPropagate(node, r); 
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 和独占锁类似,没有获取到共享锁,挂起线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
            	// 如果最后还没有获取到,直接cancel
                cancelAcquire(node);
        }
    }

原来阻塞就是挂起线程呀,我们其实早就知道了,只不过验证了下。而且上面的过程和独占锁其实很类似,不过在获取到节点后,不仅将当前线程设置成了头节点,而且还唤醒了后继节点。这就是共享锁的传播性:当前节点被唤醒后,后继节点也会被唤醒。这是因为可能不止一个线程调用了await方法进行等待。

究竟是如何进行的传播与唤醒呢?走进setHeadAndPropagate来一探究竟吧。

代码语言:javascript
复制
 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; //取出头节点
        setHead(node); //将当前节点设置为头节点
       //doAcquireShared中传参propagate一定大于0   
       //waitStatus初始为0,SIGNAL=-1;       
      // CONDITION = -2,PROPAGATE = -3;
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 取出当前节点的后继节点
            Node s = node.next;
            //共享模式或者s为空,继续doReleaseShared
            if (s == null || s.isShared())
            	//继续唤醒下一个节点
                doReleaseShared(); 
        }
    }

原来就是判断是不是共享模式,是就继续调用doReleaseShared唤醒下一个节点,doReleaseShared后面会讲.

3 countdown源码剖析

了解完await的底层原理,这里我们接下来看下countdown方法的底层原理。

看看它的底层调用方法

代码语言:javascript
复制
public void countDown() {
        sync.releaseShared(1);
}

点进releaseShared

代码语言:javascript
复制
    public final boolean releaseShared(int arg) {
    	// 尝试释放锁,成功返回true
    	// tryReleaseShared在前面讲await源码时讲过
    	//只有当锁数量为0时才会释放成功
        if (tryReleaseShared(arg)) {
        		// 继续唤醒后续节点
            doReleaseShared();
            return true;
        }
        return false;
    }

原来countdownawait最后都会调用doReleaseShared唤醒其它节点,前文留下的悬念是时候解开了,那就看看doReleaseShared吧。

代码语言:javascript
复制
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            // 如果头节点不为空,而且头、尾节点不相同
            //说明等待队列中存在节点
            if (h != null && h != tail) {
            	// 获取头节点的等待状态
                int ws = h.waitStatus;
                // 如果是SIGNAL(表示后继节点被挂起),
                //就将头节点的状态设置为初始值
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue; //失败就重来          
                    // 当头节点被唤醒,会唤醒头节点的后继节点
                    unparkSuccessor(h);
                }
                // 如果头节点的等待状态是初始状态0
                // 尝试将其状态设置为PROPAGATE(表示后继节点已经被唤醒)
                //PROPAGATE状态在setHeadAndPropagate中用到
                //可以让唤醒操作向后继节点传播
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;         // loop on failed CAS
            }
            if (h == head)         // loop if head changed
                break;
        }
    }

可能看下来觉得还是比较凌乱,没关系,我们可以回过头对本文章的源码与本专栏的AQS源码多读几遍,这里也梳理下,方便大家理解。

  • 共享锁是线程共享的,同一个时刻可能有多个线程拥有共享锁。
  • 如果一个线程刚获取到了共享锁,那么在其之后等待的线程很有可能也能够获取到共享锁,因此需要传播唤醒后继节点
  • 如果一个线程刚刚释放了线程锁,那么无论是共享锁还是独占锁,都需要传播唤醒后继节点。

这篇文章就更新完了,下篇文章将讲解循环屏障,CyclickBarrier,敬请期待。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-06-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 线程计数器CountdownLatch源码剖析
    • 文章目录
      • 1 使用计数器锁实现任务计数
        • 2 await的源码剖析
          • 3 countdown源码剖析
          相关产品与服务
          容器服务
          腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档