前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【小家java】JUC并发编程工具之CountDownLatch(闭锁)、CyclicBarrier、Semaphore的使用

【小家java】JUC并发编程工具之CountDownLatch(闭锁)、CyclicBarrier、Semaphore的使用

作者头像
YourBatman
发布2019-09-03 15:14:17
5420
发布2019-09-03 15:14:17
举报

前言

这三个类都是JDK5为我们提供的处理并发编程的工具。

CountDownLatch:是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。

CyclicBarrier:字面意思是可循环使用(Cyclic)的屏障(Barrier),它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier可重用

Semaphore:Semaphore翻译成字面意思为 信号量,Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可

CountDownLatch:闭锁

CountDownLatch也常常被我们称为闭锁,是JUC提供给我们算是比较常用的一个工具了。

最重要的三个方法如下:

public void await() throws InterruptedException { };   //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown() { };  //将count值减1
如何实现的?

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

使用场景:
  1. 实现最大并行性:注意不是并发,是并行。并行强调的是所有人同一时刻统一开始。比如:我们想测试一个单例是否有问题,用最大并行数的线程将很容易测试出来。比如:我们跑步,所有人必须在起跑线同一时刻听到枪声才能开跑的场景
  2. 开始执行前等待n个线程完成各自任务:这种使用场景应该是最多的。比如:应用程序启动前要求其余所有组件都加载完毕。比如:SpringCloud的健康检查。比如:经典的“一家人一起吃个饭”场景
  3. 死锁检测:一个非常方便的辅助测试的场景,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。
代码示例:

模拟SpringCloud的健康检查。

这种服务级别的最终up或者down,可以并行的由各个部分去心跳请求决定,最终汇总结果即可。代码如下:

先可抽象出一个BaseHealthChecker:它抽象出所有健康检查的行为

/**
 * 基类:定义了检查的行为
 *
 * @author fangshixiang
 * @description //
 * @date 2018/12/15 14:52
 */
@Getter
public abstract class BaseHealthChecker implements Runnable {

    private String serviceName; //检查的名称:如网络检查 DB检查 redis检查等等
    private boolean serviceUp; //是否健康 up

    //闭锁应该是同一把 所以传进来
    private CountDownLatch latch;

    public BaseHealthChecker(String serviceName, CountDownLatch latch) {
        super();
        this.latch = latch;
        this.serviceName = serviceName;
        this.serviceUp = false;
    }

    @Override
    public void run() {
        try {
            verifyService();
            serviceUp = true;
        } catch (Throwable t) {
            t.printStackTrace(System.err);
            serviceUp = false;
        } finally {
            if (latch != null) {
                latch.countDown();
            }
        }
    }

    /**
     * 各调用者只需要去实现这个行为即可(此处需要注意:不要返回值  我会认为没有报错 就认为是健康的)
     */
    public abstract void verifyService();
}

检查网络的检查类:NetworkHealthChecker

/**
 * 检查网络
 *
 * @author fangshixiang
 * @description //
 * @date 2018/12/15 14:53
 */
public class NetworkHealthChecker extends BaseHealthChecker {

    public NetworkHealthChecker(CountDownLatch latch) {
        super("Network Service", latch);
    }

    //模拟网络检查  只要不抛出异常 就认为是up的
    @Override
    public void verifyService() {
        System.out.println("Checking " + this.getServiceName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this.getServiceName() + " is UP");
    }
}

检查DB、redis都类似DBHealthChecker :

/**
 * 检查DB数据库
 *
 * @author fangshixiang
 * @description //
 * @date 2018/12/15 14:53
 */
public class DBHealthChecker extends BaseHealthChecker {

    public DBHealthChecker(CountDownLatch latch) {
        super("Network Service", latch);
    }

    //模拟网络检查  只要不抛出异常 就认为是up的
    @Override
    public void verifyService() {
        System.out.println("Checking " + this.getServiceName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this.getServiceName() + " is UP");
    }
}

现在可以写一个main方法,来测凸显一下闭锁的功能了:

    //需要进行健康检查的所有的服务们
    private static List<BaseHealthChecker> services = new ArrayList<>();
    //闭锁
    private static CountDownLatch latch;

    private static boolean checkServiceIsUp() throws InterruptedException {
        //先必须要知道 一共有哪些服务是需要健康检查的
        latch = new CountDownLatch(3);

        services.add(new NetworkHealthChecker(latch));
        services.add(new DBHealthChecker(latch));
        services.add(new RedisHealthChecker(latch));

        //启动一个线程池  开多线程去同时健康检查
        Executor executor = Executors.newFixedThreadPool(services.size());
        for (final BaseHealthChecker v : services) {
            executor.execute(v);
        }

        latch.await();

        for (BaseHealthChecker v : services) {
            if (!v.isServiceUp()) {
                return false;
            }
        }
        return true;
    }

    public static void main(String[] args) throws InterruptedException {
        boolean b = checkServiceIsUp();
        System.out.println("总体服务的up状态为:" + b);
    }
输出:
Checking Network Service
Checking DB Service
Checking Redis Service
Network Service is UP
DB Service is UP
Redis Service is UP
总体服务的up状态为:true

我们发现就这样很高效的实现了服务的健康检查,并且耗时由耗时最长的额决定。

Java8提供了基于流式处理的类似功能类:Completablefuture,推荐使用。具体参考我之前的博文:【小家java】Java8新特性之—CompletableFuture的系统讲解和实例演示(使用CompletableFuture构建异步应用)

缺点

构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。

一家人一起吃饭代码示例:

先定义一些方法,模拟吃饭场景

    public static void fatherToRes() {
        System.out.println("爸爸步行去饭店需要3小时。");
    }

    public static void motherToRes() {
        System.out.println("妈妈挤公交去饭店需要2小时。");
    }

    public static void meToRes() {
        System.out.println("我乘地铁去饭店需要1小时。");
    }

    public static void togetherToEat() {
        System.out.println("一家人到齐了,开始吃饭");
    }

顺序执行:

    public static void main(String[] args) {
        fatherToRes();
        motherToRes();
        meToRes();
        togetherToEat();
    }
输出:
爸爸步行去饭店需要3小时。
妈妈挤公交去饭店需要2小时。
我乘地铁去饭店需要1小时。
一家人到齐了,开始吃饭

我们发现,光集合就花了6个小时。改进版本:

    public static void main(String[] args) {
        new Thread(() -> fatherToRes()).start();
        new Thread(() -> motherToRes()).start();
        new Thread(() -> meToRes()).start();
        togetherToEat();
    }
输出:
爸爸步行去饭店需要3小时。
一家人到齐了,开始吃饭
妈妈挤公交去饭店需要2小时。
我乘地铁去饭店需要1小时。

这个好像也不行,人还没到齐就开饭了。继续改进

    //定义一个变量 必须等于0了才开饭
    private static volatile int i = 3;

    public static void main(String[] args) {
        new Thread(() -> {
            fatherToRes();
            i--;
        }).start();
        new Thread(() -> {
            motherToRes();
            i--;
        }).start();
        new Thread(() -> {
            meToRes();
            i--;
        }).start();

        while (i != 0) {
            //此处一直hole住等待
        }
        togetherToEat();
    }
输出:
爸爸步行去饭店需要3小时。
妈妈挤公交去饭店需要2小时。
我乘地铁去饭店需要1小时。
一家人到齐了,开始吃饭

这个实际上达到了效果。但是,但是while盲等待是对于CPU的消耗太巨大了,我们需要更好的实现方式。(备注:此处用volatile修饰i是有并发问题的,读者可以使用AtomicInteger或者LongAdder改进,此处我就不改了哈)

可以参考:

【小家java】使用volatile关键字来实现内存可见性、实现轻量级锁

【小家java】AtomicLong可以抛弃了,请使用LongAdder代替(或使用LongAccumulator)

最终版本:

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            fatherToRes();
            latch.countDown();
        }).start();
        new Thread(() -> {
            motherToRes();
            latch.countDown();
        }).start();
        new Thread(() -> {
            meToRes();
            latch.countDown();
        }).start();

        latch.await();

        togetherToEat();
    }
输出:
妈妈挤公交去饭店需要2小时。
爸爸步行去饭店需要3小时。
我乘地铁去饭店需要1小时。
一家人到齐了,开始吃饭

这样子,我们就不用一直hold住cpu,不用盲等了。

CyclicBarrier

CyclicBarrier概念,前言里面已经有所介绍了。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

   public CyclicBarrier(int parties) {
        this(parties, null);
    }

我们把上面一家人一起吃饭的例子改造一下:

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(4);

    public static void main(String[] args) throws Exception {
        new Thread(() -> {
            fatherToRes();
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
            }
        }).start();
        new Thread(() -> {
            motherToRes();
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
            }
        }).start();
        new Thread(() -> {
            meToRes();
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
            }
        }).start();

        //主线程也要await
        cyclicBarrier.await();

        togetherToEat();
    }

需要注意的是:

1、main主线程也是一个线程,所以也要await

2、new CyclicBarrier的值是4,而不是3(会有个线程控制不了),也不是5(程序将永远等待,因为没有第五个线程执行await方法,即没有第五个线程到达屏障,所以之前到达屏障的四个线程都不会继续执行。

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时(前提条件也必须是先达到屏障),优先执行barrierAction,方便处理更复杂的业务场景。(比如一家人吃饭,必须永远是爸爸先吃)

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> System.out.println("人到齐了,爸爸先动筷子"));

    public static void main(String[] args) throws Exception {
        new Thread(() -> {
            fatherToRes();
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
            }
        }).start();
        new Thread(() -> {
            motherToRes();
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
            }
        }).start();
        new Thread(() -> {
            meToRes();
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
            }
        }).start();

        //主线程也要await
        cyclicBarrier.await();

        togetherToEat();
    }
输出:
爸爸步行去饭店需要3小时。
妈妈挤公交去饭店需要2小时。
我乘地铁去饭店需要1小时。
人到齐了,爸爸先动筷子
一家人到齐了,开始吃饭
应用场景

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

CyclicBarrier和CountDownLatch的区别
  • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。
  • CountDownLatch是减计数方式,而CyclicBarrier是加计数方式
  • CountDownLatch不可以复用,而CyclicBarrier可以复用。
Semaphore

Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。

Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。

原来阐述

以一个停车场为例。加入一共只有3个停车位,那么当来了5辆车的时候,那么看门的人可以不受阻碍的放3辆车进去,其余的在入口处等候。这时候如果继续来车就都得在门外等候。

这时,如果有一辆车停车位里的车离开了,就可以放一辆进来了(至于放哪俩进来,有公平锁和非公平锁之分),如此往复。

每辆车就好比一个线程,看门人就好比一个信号量,看门人限制了可以活动的线程。

对于Semaphore类而言,就如同一个看门人,限制了可活动的线程数。

主要方法:
Semaphore(int permits):构造方法,创建具有给定许可数的计数信号量并设置为非公平信号量。
Semaphore(int permits,boolean fair):构造方法,当fair等于true时,创建具有给定许可数的计数信号量并设置为公平信号量。

void acquire():从此信号量获取一个许可前线程将一直阻塞。相当于一辆车占了一个车位。
void acquire(int n):从此信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。比如n=2,就相当于一辆车占了两个车位。
void release():释放一个许可,将其返回给信号量。就如同车开走返回一个车位。
void release(int n):释放n个许可。
int availablePermits():当前可用的许可数。
代码示例

就以上面的提车系统,用代码里实现(仅供参考)

    //控制三个停车位
    private static final Semaphore semaphore = new Semaphore(3);
    //线程池:核心线程数可以有5个
    private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

    //一辆车代表一个进程 进去提车系统
    private static class CarThread extends Thread {

        private final String name; //品牌名称
        private final int age; //使用了多少年

        public CarThread(String name, int age) {
            this.name = name;
            this.age = age;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire(); //常识去获取许可 进入停车
                System.out.println(Thread.currentThread().getName() + ":大家好,我是【" + name + "】使用了【" + age + "】年,当前时间为:" + System.currentTimeMillis());
                Thread.sleep(1000); //模拟停车时长 停后离开

                System.out.println("【" + name + "】要准备离开停车场了,当前剩余空位【" + semaphore.availablePermits() + "】,当前时间为:" + System.currentTimeMillis());
                semaphore.release();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        String[] name = {"奔驰", "宝马", "奥迪", "大众", "尼桑", "wey派", "领克"};
        int[] age = {1, 3, 5, 6, 7, 10, 12};

        //一次性来了7辆车
        for (int i = 0; i < 7; i++) {
            threadPool.execute(new CarThread(name[i], age[i]));
        }
    }
输出:
pool-1-thread-3:大家好,我是【奥迪】使用了【5】年,当前时间为:1544953335062
pool-1-thread-2:大家好,我是【宝马】使用了【3】年,当前时间为:1544953335062
pool-1-thread-1:大家好,我是【奔驰】使用了【1】年,当前时间为:1544953335063
【奔驰】要准备离开停车场了,当前剩余空位【0】,当前时间为:1544953336081
pool-1-thread-1:大家好,我是【wey派】使用了【10】年,当前时间为:1544953336081
【奥迪】要准备离开停车场了,当前剩余空位【0】,当前时间为:1544953336085
pool-1-thread-4:大家好,我是【大众】使用了【6】年,当前时间为:1544953336088
【宝马】要准备离开停车场了,当前剩余空位【0】,当前时间为:1544953336088
pool-1-thread-5:大家好,我是【尼桑】使用了【7】年,当前时间为:1544953336089
【wey派】要准备离开停车场了,当前剩余空位【0】,当前时间为:1544953337083
pool-1-thread-3:大家好,我是【领克】使用了【12】年,当前时间为:1544953337083
【大众】要准备离开停车场了,当前剩余空位【0】,当前时间为:1544953337089
【尼桑】要准备离开停车场了,当前剩余空位【1】,当前时间为:1544953337094
【领克】要准备离开停车场了,当前剩余空位【2】,当前时间为:1544953338083

从打印的日志中我们可以看出,默认Semaphore是非公平锁的。那写在我们改一下,改动一句话:

private static final Semaphore semaphore = new Semaphore(3, true);

运行:

输出:
pool-1-thread-2:大家好,我是【宝马】使用了【3】年,当前时间为:1544953560457
pool-1-thread-3:大家好,我是【奥迪】使用了【5】年,当前时间为:1544953560457
pool-1-thread-1:大家好,我是【奔驰】使用了【1】年,当前时间为:1544953560457
【奔驰】要准备离开停车场了,当前剩余空位【0】,当前时间为:1544953561468
pool-1-thread-4:大家好,我是【大众】使用了【6】年,当前时间为:1544953561468
【奥迪】要准备离开停车场了,当前剩余空位【0】,当前时间为:1544953561468
pool-1-thread-5:大家好,我是【尼桑】使用了【7】年,当前时间为:1544953561468
【宝马】要准备离开停车场了,当前剩余空位【0】,当前时间为:1544953561469
pool-1-thread-1:大家好,我是【wey派】使用了【10】年,当前时间为:1544953561469
【大众】要准备离开停车场了,当前剩余空位【0】,当前时间为:1544953562468
pool-1-thread-3:大家好,我是【领克】使用了【12】年,当前时间为:1544953562468
【尼桑】要准备离开停车场了,当前剩余空位【0】,当前时间为:1544953562469
【wey派】要准备离开停车场了,当前剩余空位【0】,当前时间为:1544953562469
【领克】要准备离开停车场了,当前剩余空位【2】,当前时间为:1544953563470

从输出的结果可以看出"大众", “尼桑”, “wey派”, "领克"是排队按照顺序进入的,这时候就是公平锁了。

Semaphore内部基于AQS的共享模式,所以实现都委托给了Sync类。 (原理其实大家可以自行参照源码,也比较简单) new Semaphore(1)可以利用这个,间接实现单例模式

Semaphore总结

Semaphore主要用于控制当前活动线程数目,就如同停车场系统一般,而Semaphore则相当于看守的人,用于控制总共允许停车的停车位的个数。虽然我们自己也可以通过lock等手动来控制,但既然JUC为我们提供了便捷的工具,为何不使用呢?

Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。

相关面试题
  • 解释一下CountDownLatch概念?
  • CountDownLatch 和CyclicBarrier的不同之处?
  • 给出一些CountDownLatch使用的例子?
  • CountDownLatch 类中主要的方法?
  • 说说你对Semaphore的理解,可以写一个示例吗?
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年12月15日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • CountDownLatch:闭锁
    • 如何实现的?
      • 使用场景:
        • 代码示例:
          • 缺点
            • 一家人一起吃饭代码示例:
            • CyclicBarrier
              • 应用场景
              • CyclicBarrier和CountDownLatch的区别
              • Semaphore
                • 原来阐述
                  • 主要方法:
                    • 代码示例
                      • Semaphore总结
                      • 相关面试题
                      相关产品与服务
                      云数据库 Redis
                      腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档