Java的并发工具类汇总

前言

Java给我们提供了很多通用且好用的并发工具类,现在我就总结一下

Atomic包

更新基本类型变量

Atomic包原子更新基本类型的工具类:

AtomicBoolean:以原子更新的方式更新boolean; AtomicInteger:以原子更新的方式更新Integer;

AtomicLong:以原子更新的方式更新Long;

这几个类的用法基本一致,这里以AtomicInteger为例总结常用的方法

addAndGet(int delta) :以原子方式将输入的数值与实例中原本的值相加,并返回最后的结果; incrementAndGet() :以原子的方式将实例中的原值进行加1操作,并返回最终相加后的结果; getAndSet(int newValue):将实例中的值更新为新值,并返回旧值;

getAndIncrement():以原子的方式将实例中的原值加1,返回的是自增前的旧值;

其他它的底层是运用了CAS的原理,具体还各位可以去看源码,比如我就举一个例子

    public final int getAndIncrement() {

        return unsafe.getAndAddInt(this, valueOffset, 1);

    }


    private static final Unsafe unsafe = Unsafe.getUnsafe();

更新数组类型变量

atomic包下提供能原子更新数组中元素的类有:

AtomicIntegerArray:原子更新整型数组中的元素; AtomicLongArray:原子更新长整型数组中的元素; AtomicReferenceArray:原子更新引用类型数组中的元素

这几个类的用法一致,就以AtomicIntegerArray来总结下常用的方法:

addAndGet(int i, int delta):以原子更新的方式将数组中索引为i的元素与输入值相加; getAndIncrement(int i):以原子更新的方式将数组中索引为i的元素自增加1; compareAndSet(int i, int expect, int update):将数组中索引为i的位置的元素进行更新

更新引用类型变量

如果需要原子更新引用类型变量的话,为了保证线程安全,atomic也提供了相关的类:

AtomicReference:原子更新引用类型; AtomicReferenceFieldUpdater:原子更新引用类型里的字段;

AtomicMarkableReference:原子更新带有标记位的引用类型;

这几个类的使用方法也是基本一样的,以AtomicReference为例,来说明这些类的基本用法。

更新对象的某个字段

如果需要更新对象的某个字段,并在多线程的情况下,能够保证线程安全,atomic同样也提供了相应的原子操作类:

AtomicIntegeFieldUpdater:原子更新整型字段类; AtomicLongFieldUpdater:原子更新长整型字段类; AtomicStampedReference:原子更新引用类型,这种更新方式会带有版本号。 而为什么在更新的时候会带有版本号,是为了解决CAS的ABA问题;

要想使用原子更新字段需要两步操作:

原子更新字段类都是抽象类,只能通过静态方法newUpdater来创建一个更新器,并且需要设置想要更新的类和属性; 更新类的属性必须使用public volatile进行修饰;

CountDownLatch

CountDownLatch是一个非常实用的多线程控制工具类。常用下面几个方法:

    CountDownLatch(int count) //实例化一个倒计数器,count指定计数个数

    countDown() // 计数减一

    await() //等待,当计数减到0时,所有线程并行执行

CountDownLatch在我工作的多个场景被使用,算是用的很频繁的了,比如我们的API接口响应时间被要求在200ms以内,但是如果一个接口内部依赖多个三方外部服务,那串行调用接口的RT必然很久,所以个人用的最多的是接口RT优化场景,内部服务并行调用。

对于倒计数器,一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检测。只有等到所有的检查完毕后,引擎才能点火。那么在检测环节当然是多个检测项可以同时进行的。代码实现:

    public class CountDownLatchDemo implements Runnable{


        static final CountDownLatch latch = new CountDownLatch(10);

        static final CountDownLatchDemo demo = new CountDownLatchDemo();


        @Override

        public void run() {

            // 模拟检查任务

            try {

                Thread.sleep(new Random().nextInt(10) * 1000);

                System.out.println("check complete");

            } catch (InterruptedException e) {

                e.printStackTrace();

            } finally {

                //计数减一

                //放在finally避免任务执行过程出现异常,导致countDown()不能被执行

                latch.countDown();

            }

        }



        public static void main(String[] args) throws InterruptedException {

            ExecutorService exec = Executors.newFixedThreadPool(10);

            for (int i=0; i<10; i++){

                exec.submit(demo);

            }


            // 等待检查

            latch.await();


            // 发射火箭

            System.out.println("Fire!");

            // 关闭线程池

            exec.shutdown();

        }

    }

上述代码中我们先生成了一个CountDownLatch实例。计数数量为10,这表示需要有10个线程来完成任务,等待在CountDownLatch上的线程才能继续执行。latch.countDown();

方法作用是通知CountDownLatch有一个线程已经准备完毕,倒计数器可以减一了。latch.await()方法要求主线程等待所有10个检查任务全部准备好才一起并行执行。

CyclicBarrier

其实,有点类似人满发车这个词来理解CyclicBarrier的作用:

长途汽车站提供长途客运服务。当等待坐车的乘客到达20人时,汽车站就会发出一辆长途汽车,让这20个乘客上车走人。等到下次等待的乘客又到达20人是,汽车站就会又发出一辆长途汽车。

CyclicBarrier常用于多线程分组计算

下面来看下CyclicBarrier的主要方法:

    //等到所有的线程都到达指定的临界点

    await() throws InterruptedException, BrokenBarrierException 


    //与上面的await方法功能基本一致,只不过这里有超时限制,阻塞等待直至到达超时时间为止

    await(long timeout, TimeUnit unit) throws InterruptedException, 

    BrokenBarrierException, TimeoutException 


    //获取当前有多少个线程阻塞等待在临界点上

    int getNumberWaiting()


    //用于查询阻塞等待的线程是否被中断

    boolean isBroken()



    //将屏障重置为初始状态。如果当前有线程正在临界点等待的话,将抛出BrokenBarrierException。

    void reset()

另外需要注意的是,CyclicBarrier提供了这样的构造方法:

public CyclicBarrier(int parties, Runnable barrierAction)

下面用一个简单的例子来看下CyclicBarrier的用法,模拟下上面的坐车的例子。

    public class CyclicBarrierDemo {

        //指定必须有20个乘客到达才行

        private static CyclicBarrier barrier = new CyclicBarrier(20, () -> {

            System.out.println("所有乘客都上车了,司机开车!!!!!");

        });

        public static void main(String[] args) {

            System.out.println("乘客准备上车...........");


            ExecutorService service = Executors.newFixedThreadPool(6);

            for (int i = 0; i < 6; i++) {

                service.execute(() -> {

                    try {

                        System.out.println(Thread.currentThread().getName() + " 乘客,上车");

                        barrier.await();

                        System.out.println(Thread.currentThread().getName() + "  乘客,上车");

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    } catch (BrokenBarrierException e) {

                        e.printStackTrace();

                    }

                });

            }

        }


    }

CountDownLatch与CyclicBarrier比较

这两者还是各有不同侧重点的:

1、CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成,再携手共进。

2、调用CountDownLatch的countDown方法后,当前线程并不会阻塞,会继续往下执行;而调用CyclicBarrier的await方法,会阻塞当前线程,直到CyclicBarrier指定的线程全部都到达了指定点的时候,才能继续往下执行;

3、CountDownLatch方法比较少,操作比较简单,而CyclicBarrier提供的方法更多,比如能够通过getNumberWaiting(),isBroken()这些方法获取当前多个线程的状态,并且CyclicBarrier的构造方法可以传入barrierAction,指定当所有线程都到达时执行的业务功能;

4、CountDownLatch是不能复用的,而CyclicLatch是可以复用的

Semaphore

Semaphore叫信号量,Semaphore有两个目的,第一个是多个共享资源互斥使用,第二个是并发线程数的控制。

Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接。假如有多个线程读取数据后,需要将数据保存在数据库中,而可用的最大数据库连接只有10个,这时候就需要使用Semaphore来控制能够并发访问到数据库连接资源的线程个数最多只有10个。

Semaphore的主要方法:

    //获取许可,如果无法获取到,则阻塞等待直至能够获取为止

    void acquire() throws InterruptedException 


    //同acquire方法功能基本一样,只不过该方法可以一次获取多个许可

    void acquire(int permits) throws InterruptedException


    //释放许可

    void release()


    //释放指定个数的许可

    void release(int permits)


    //尝试获取许可,如果能够获取成功则立即返回true,否则,则返回false

    boolean tryAcquire()


    //与tryAcquire方法一致,只不过这里可以指定获取多个许可

    boolean tryAcquire(int permits)


    //尝试获取许可,如果能够立即获取到或者在指定时间内能够获取到,则返回true,否则返回false

    boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException


    //与上一个方法一致,只不过这里能够获取多个许可

    boolean tryAcquire(int permits, long timeout, TimeUnit unit)


    //返回当前可用的许可证个数

    int availablePermits()


    //返回正在等待获取许可证的线程数

    int getQueueLength()


    //是否有线程正在等待获取许可证

    boolean hasQueuedThreads()


    //获取所有正在等待许可的线程集合

    Collection<Thread> getQueuedThreads()

Semaphore信号量demo

/**
 * Semaphore叫信号量 or 信号灯
 * Semaphore有两个目的,第一个目的是多个共享资源互斥使用,第二目的是并发线程数的控制
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        // 模拟厕所10个茅坑
        Semaphore semaphore = new Semaphore(5);
        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {
                try {
                    // 获取锁资源
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "\t上厕所");
                    // 模拟人上厕所10秒,然后让出坑位
                    TimeUnit.SECONDS.sleep(5);
                    System.out.println(Thread.currentThread().getName() + "\t上完厕所,让出坑位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 释放锁资源
                    semaphore.release();
                }
            }, "" + i + "号帅哥").start();
        }
    }
}

线程间交换数据Exchanger

Exchanger是一个用于线程间协作的工具类,用于两个线程间能够交换。它提供了一个交换的同步点,在这个同步点两个线程能够交换数据。

具体交换数据是通过exchange方法来实现的,如果一个线程先执行exchange方法,那么它会同步等待另一个线程也执行exchange方法,这个时候两个线程就都达到了同步点,两个线程就可以交换数据。

Exchanger除了一个无参的构造方法外,主要方法也很简单:

    //当一个线程执行该方法的时候,会等待另一个线程也执行该方法,因此两个线程就都达到了同步点

    //将数据交换给另一个线程,同时返回获取的数据

    V exchange(V x) throws InterruptedException


    //同上一个方法功能基本一样,只不过这个方法同步等待的时候,增加了超时时间

    V exchange(V x, long timeout, TimeUnit unit)

        throws InterruptedException, TimeoutException 

模拟这样一个情景,在青春洋溢的中学时代,下课期间,男生经常会给走廊里为自己喜欢的女孩子送情书,相信大家都做过这样的事情吧 :)。男孩会先到女孩教室门口,然后等女孩出来,教室那里就是一个同步点,然后彼此交换信物,也就是彼此交换了数据。现在,就来模拟这个情景。

    public class ExchangerDemo {

        private static Exchanger<String> exchanger = new Exchanger();


        public static void main(String[] args) {


            //代表男生和女生

            ExecutorService service = Executors.newFixedThreadPool(2);


            service.execute(() -> {

                try {

                    //男生对女生说的话

                    String girl = exchanger.exchange("我其实暗恋你很久了......");

                    System.out.println("女孩儿说:" + girl);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            });

            service.execute(() -> {

                try {

                    System.out.println("女生慢慢的从教室你走出来......");

                    TimeUnit.SECONDS.sleep(3);

                    //男生对女生说的话

                    String boy = exchanger.exchange("我也很喜欢你......");

                    System.out.println("男孩儿说:" + boy);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            });


        }

    }


    输出结果:


    女生慢慢的从教室你走出来......

    男孩儿说:我其实暗恋你很久了......

    女孩儿说:我也很喜欢你......

总结

暂时总结到这里,如果还有什么遗漏的,可以留言给我哈。

参考文章

https://www.relaxheart.cn/to/blog/streamline?uuid=83 https://juejin.im/post/5aeec3ebf265da0ba76fa327

本文分享自微信公众号 - 爱编码(ilovecode)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-08-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券