专栏首页JVMGCJava同步组件之CountDownLatch,Semaphore
原创

Java同步组件之CountDownLatch,Semaphore

Java同步组件概况

  • CountDownLatch : 是闭锁,通过一个计数来保证线程是否一直阻塞
  • Semaphore: 控制同一时间,并发线程数量
  • CyclicBarrier:字面意思是回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
  • ReentrantLock:是一个重入锁,一个线程获得了锁之后仍然可以反复加锁,不会出现自己阻塞自己的情况。
  • Condition:配合ReentrantLock,实现等待/通知模型
  • FutureTask:FutureTask实现了接口Future,同Future一样,代表异步计算的结果。

CountDownLatch 同步辅助类

CountDownLatch类位于java.util.concurrent包,利用它可以实现类似计数器的功能,比如有一个任务A,它要等待其它4个任务执行完毕后才能执行,此时就可以使用CountDownLatch来实现这种功能。

image-20210115093334548

假设计数器的值是3,线程A调用await()方法后,当前线程就进入了等待状态,之后其它线程中执行CountDownLatch,计数器就会减1,当计数器从3变成0,线程A继续执行,CountDownLatch这个类可以阻塞当前线程,保证线程在某种条件下,继续执行。

构造器中的计数值(count)实际上就是闭锁需要等待的线程数量,这个值只能被设置一次,而且CountDownLatch没有提供任何机会修改这个计数值。

CountDownLatch代码案例

package com.rumenz.task;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class CountDownLatchTest {

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch=new CountDownLatch(2);
        executorService.execute(()->{
            try{
                Thread.sleep(3000);
                System.out.println("任务一完成");

            }catch (Exception e){
               e.printStackTrace();
            }


            countDownLatch.countDown();
        });
        executorService.execute(()->{
            try{
                Thread.sleep(5000);
                System.out.println("任务二完成");

            }catch (Exception e){
                e.printStackTrace();
            }

            countDownLatch.countDown();
        });
        countDownLatch.await();
        //所有子任务执行完后才会执行
        System.out.println("主线程开始工作.....");
        executorService.shutdown();
    }
}

任务一完成
任务二完成
主线程开始工作.....

CountDownlatch指定时间完成任务,如果在规定时间内完成,则等待之前的等待线程(countDownLatch.await())继续执行

countDownLatch.await(int timeout,TimeUnit timeUnit);设置,第一个参数没超时时间,第二个参数为时间单位。

package com.rumenz.task;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class CountDownLatchTest {

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch=new CountDownLatch(2);
        executorService.execute(()->{
            try{
                Thread.sleep(3000);
                System.out.println("任务一完成");

            }catch (Exception e){
               e.printStackTrace();
            }


            countDownLatch.countDown();
        });
        executorService.execute(()->{
            try{
                Thread.sleep(5000);
                System.out.println("任务二完成");

            }catch (Exception e){
                e.printStackTrace();
            }

            countDownLatch.countDown();
        });
        //这里只等3秒
        countDownLatch.await(3, TimeUnit.SECONDS);
        //所有子任务执行完后才会执行
        System.out.println("主线程开始工作.....");
        executorService.shutdown();
    }
}
//任务一完成
//主线程开始工作.....
//任务二完成

Semaphore控制线程数量

Semaphore经常用于限制获取某种资源的线程数量,其内部是基于AQS的共享模式,AQS的状态可以表示许可证的数量,许可证数量不够线程被挂起;而一旦有一个线程释放资源,那么可唤醒等待队列中的线程继续执行。

5b4741030001ee2d19201080.jpg (1920×1080)

Semaphore翻译过来就是信号量,Semaphore可以阻塞进程并控制同时访问的线程数,通过acquire()获取一个许可,如果没有就等待,而release()释放一个许可,Semaphore有点类似锁。

CountDownLatchSemaphore在使用时,通过和线程池配合使用。 Semaphore适合控制并发,CountDownLatch比较适合保证线程执行完后再执行其它处理,因此模拟并发两者结合最好。

Semaohore应用场景

Semaphore适合做流量控制,特别是共享的有限资源,比如数据库连接,假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控。

package com.rumenz.task;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


public class SemaphoreExample1 {
    private static Integer clientTotal=30;
    private static Integer threadTotal=3;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore=new Semaphore(threadTotal);
        for (int i = 0; i < clientTotal; i++) {
            final Integer j=i;
            executorService.execute(()->{
                try{
                    semaphore.acquire(); // 获取一个许可
                    update(j);
                    semaphore.release(); // 释放一个许可

                }catch (Exception e) {
                    e.printStackTrace();
                }

            });
            
        }
        executorService.shutdown();
    }

    private static void update(Integer j) throws Exception {
        System.out.println(j);
        Thread.sleep(2000);

    }
}

每2秒打印3个数字。

package com.rumenz.task;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


public class SemaphoreExample1 {
    private static Integer clientTotal=30;
    private static Integer threadTotal=3;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore=new Semaphore(threadTotal);
        for (int i = 0; i < clientTotal; i++) {
            final Integer j=i;
            executorService.execute(()->{
                try{
                    semaphore.acquire(3); // 获取多个许可
                    update(j);
                    semaphore.release(3); // 释放多个许可

                }catch (Exception e) {
                    e.printStackTrace();
                }

            });
            
        }
        executorService.shutdown();
    }

    private static void update(Integer j) throws Exception {
        System.out.println(j);
        Thread.sleep(2000);

    }
}

每2秒打印一个数字。

tryAcquire

尝试获取许可,如果获取不成功,则放弃操作,tryAcquire方法提供几个重载

  • tryAcquire() : boolean
  • tryAcquire(int permits) : boolean 尝试获取指定数量的许可
  • tryAcquire(int permits,long timeout,TimeUnit timeUnit) : boolean
  • tryAcquire(long timeout,TimeUnit timeUnit) : boolean 尝试获取许可的时候可以等待一段时间,在指定时间内未获取到许可则放弃

Semaphore源码分析

Semaphore有两种模式,公平模式和非公平模式。公平模式就是调用acquire的顺序就是获取许可证的顺序,遵循FIFO;而非公平模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程。

// 非公平模式
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
// fair=true为公平模式,false=非公平模式
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

public class Semaphore implements java.io.Serializable {
    /*
     * 只指定许可量,构造不公平模式
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
   
    /*
     * 指定许可量,并指定模式
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    
    //Semaphore内部基于AQS的共享模式,所以实现都委托给了Sync类。 
    abstract static class Sync extends AbstractQueuedSynchronizer {}
    
        /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            // 可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            // 可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    
}
wx.jpg

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java多线程打辅助的三个小伙子

    之前学多线程的时候没有学习线程的同步工具类(辅助类)。ps:当时觉得暂时用不上,认为是挺高深的知识点就没去管了..

    Java3y
  • Java同步容器

    Vector实现List接口,底层和ArrayList类似,但是Vector中的方法都是使用synchronized修饰,即进行了同步的措施。 但是,Vecto...

    入门小站
  • 【小家java】JUC并发编程工具之CountDownLatch(闭锁)、CyclicBarrier、Semaphore的使用

    CountDownLatch:是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。

    YourBatman
  • Java高并发编程基础三大利器之CountDownLatch

    上一篇文章我们介绍了AQS的信号量Semaphore《Java高并发编程基础三大利器之Semaphore》,接下来应该轮到CountDownLatch了。

    java金融
  • Java高并发编程基础三大利器之CountDownLatch

    上一篇文章我们介绍了AQS的信号量Semaphore《Java高并发编程基础三大利器之Semaphore》,接下来应该轮到CountDownLatch了。

    java金融
  • Java同步组件之CyclicBarrier,ReentrantLock

    入门小站
  • (81) 并发同步协作工具 / 计算机程序的思维逻辑

    查看历史文章,请点击上方链接关注公众号。 我们在67节和68节实现了线程的一些基本协作机制,那是利用基本的wait/notify实现的,我们提到,Java并发包...

    swiftma
  • 搞懂这几个锁用法,多线程就懂一半了

    synchronized机制是给共享资源上锁,只有拿到锁的线程才可以访问共享资源,这样就可以强制使得对共享资源的访问都是顺序的。

    java乐园
  • ​Java 并发包提供了哪些并发工具类

    假设有10个人排队,我们将其分成5个人一批,使用CountDownLatc 来协调。

    王小明_HIT
  • Java并发:Semaphore信号量源码分析

    JUC 中 Semaphore 的使用与原理分析,Semaphore 也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarri...

    搜云库技术团队
  • 面经手册 · 第18篇《AQS 共享锁,Semaphore、CountDownLatch,听说数据库连接池可以用到!》

    其实并没有一天的突飞猛进,也没有一口吃出来的胖子。有得更多的时候日积月累、不断沉淀,最后才能爆发、破局!

    小傅哥
  • Java并发包提供了哪些并发工具类?

    我们通常所说的并发包也就是 java.util.concurrent 及其子包,集中了 Java 并发的各种基础工具类,具体主要包括几个方面

    葆宁
  • 最强Java并发编程详解:知识点梳理,BAT面试题等

    在JUC锁: AbstractQueuedSynchonizer详解中类的内部类-conditionobject类有具体分析。

    Java团长
  • Java 各种锁的小结

    从 JDK 1.6 开始,synchronized 做了很多优化,如偏向锁、轻量级锁、自旋锁、适应性自旋锁、锁消除、锁粗化等技术来减少锁操作的开销。

    芋道源码
  • Java并发工具那些事儿

    Java并发工具类主要有CyclicBarrier、CountDownLatch、Semaphore和Exchanger,日常开发中经常使用的是CountDow...

    luoxn28
  • 三个好用的并发工具类

    以前的文章中,我们介绍了太多的底层原理技术以及新概念,本篇我们轻松点,了解下 Java 并发包下、基于这些底层原理的三个框架工具类。

    Single
  • CountDownLatch、CyclicBarrier、Semaphore的区别,你知道吗?

    从结果可以看出,当四个线程都到达barrier状态后,会从四个线程中选择一个线程去执行Runnable。

    好好学java
  • Java多线程并发工具类-信号量Semaphore对象讲解

    通过前面的学习,我们已经知道了Java多线程并发场景中使用比较多的两个工具类:做加法的CycliBarrier对象以及做减法的CountDownLatch对象并...

    凯哥Java
  • Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

    Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

    yaphetsfang

扫码关注云+社区

领取腾讯云代金券