专栏首页ytaoJava 多线程中使用 JDK 自带工具类实现计数器

Java 多线程中使用 JDK 自带工具类实现计数器

前言

在实际开发过程中,经常遇到需要多线程并行的业务,最后需要进行将各个线程完成的任务进行汇总,但主线程一般会早于子线程结束,如果要想等各个子线程完成后再继续运行主线程,这时就需要对各个线程是否执行完成进行标识,JDK 并发包中就给开发者提供了几个不错的使用工具类。

接下来将通过 Thread#join 方法以及 CountDownLatch、CyclicBarrier 类进行上面案例方案的分析。

Thread#join 方法

使用 join() 方法的子线程对象正常执行 run() 中代码,但当前线程会被无超时阻塞,等待执行 join() 方法的线程销毁后,继续执行被阻塞的当前线程。join() 方法阻塞原理是该方法内使用 wait() 方法阻塞,源码如下所示:

子线程 join() 完成时会调用 notifyAll() 来通知当前线程继续执行接下来的代码。

假如现在有两个线程产生数据结果,最后将两个线程结果进行相加,如果直接将两个线程执行并进行汇总,如下实现代码:

package top.ytao.demo.thread.count;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * htpps://ytao.top
 *
 * Created by YangTao on 2020/5/17 0017.
 */
public class JoinTest {


    public static void main(String[] args) throws InterruptedException {

        Map<String, Integer> map = new ConcurrentHashMap<>();

        Thread thread1 = new Thread(() -> {
            map.put("thread1", 1);
            System.out.println("run thread1");
        });

        Thread thread2 = new Thread(() -> {
            map.put("thread2", 2);
            System.out.println("run thread2");
        });


        thread1.start();
        thread2.start();

        System.out.println(map.get("thread1") + map.get("thread2"));

        System.out.println("end....");

    }
}

执行结果:

由于主线程的汇总计算可能早于子线程完成,所以这时获取子线程结果为空指针异常。

通过增加 join() 方法实现阻塞主线程,等待子线程完成后再进行汇总:

package top.ytao.demo.thread.count;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * htpps://ytao.top
 *
 * Created by YangTao on 2020/5/17 0017.
 */
public class JoinTest {


    public static void main(String[] args) throws InterruptedException {

        Map<String, Integer> map = new ConcurrentHashMap<>();

        Thread thread1 = new Thread(() -> {
            map.put("thread1", 1);
            System.out.println("run thread1");
        });

        Thread thread2 = new Thread(() -> {
            map.put("thread2", 2);
            System.out.println("run thread2");
        });


        thread1.start();
        thread2.start();

        // 两个线程分别调用 join() 方法,使主线程被阻塞
        thread1.join();
        thread2.join();

        System.out.println(map.get("thread1") + map.get("thread2"));

        System.out.println("end....");

    }
}

执行结果为:

通过结果可以看到子线程汇总求和为 3。此时主线程在两个子线程销毁前都处于等待状态,直至两个销毁后主线程再执行汇总求和,所以两个线程产生的值都已存在。

同时,子线程 join() 方法可以使当前线程无期限等待,也可以设置最长等待时长 join(long) 方法,无论子线程是否执行完成,当前线程会继续执行后面代码。使用方法加入超时参数即可,其它与 join() 方法使用相同。

CountDownLatch

CountDownLatch 可以使一个或多个线程等待其他线程完成操作后再继续执行当前线程后面代码。

CountDownLatch 的使用:首先创建 CountDownLatch 对象,通过传入参数 int 构造 CountDownLatch 对象。该参数是值将要等待的执行点的数量。

CountDownLatch 中有几个方法:

  • getCount() 返回当前计数器数,即当前剩余的等待数量。官方解释说该方法通常用于调试和测试目的。
  • countDown 每调用一次,计数器便会进行减 1 操作,但计数器必须大于 0。
  • await 该方法会阻塞当前线程,直至计数器为 0 时,就会不再阻塞当前线程。同时也提供 await(long timeout, TimeUnit unit) 方法,可设置超时时间。

利用 CountDownLatch 实现汇总求和案例,实现代码如下:

package top.ytao.demo.thread.count;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

/**
 * https://ytao.top
 *
 * Created by YangTao on 2020/5/17 0017.
 */
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {

        Map<String, Integer> map = new ConcurrentHashMap<>();

        CountDownLatch count = new CountDownLatch(2);

        Thread thread1 = new Thread(() -> {
            map.put("thread1", 1);
            System.out.println("run thread1");
            count.countDown();
        });

        Thread thread2 = new Thread(() -> {
            map.put("thread2", 2);
            System.out.println("run thread2");
            count.countDown();
        });


        thread1.start();
        thread2.start();

        // 一直阻塞当前线程,直至计数器为 0
        count.await();

        System.out.println(map.get("thread1") + map.get("thread2"));

        System.out.println("end.... getCount: " + count.getCount());
    }

}

执行结果如下:

上图中求和结果为 3,同时计数器为 0。

通过查看 CountDownLatch 源码,主要是通过一个继承 AbstractQueuedSynchronizer 类的内部类 Sync 来实现的,可知其实现原理为 AQS,这里不进行展开讲述。

CyclicBarrier

CyclicBarrier 是一个可循环使用的屏障。实现原理解释,就是在一个或多个线程运行中设置一个屏障,线程到达这个屏障时会被阻塞,直到最后一个线程到达时,被屏障阻塞的线程继续执行。

CyclicBarrier 构造方法有两个, CyclicBarrier(intcount)CyclicBarrier(intcount,RunnablebarrierAction):

  • 单个 int参数构造方法,表示构造到达屏障线程的数量。
  • 一个 int和一个 Runnable参数构造方法,前者参数表示到达屏障线程的数量,后者参数表示所有线程到达屏障后接下来要执行的代码;

CyclicBarrier 中方法:

方法

说明

await()

阻塞前线程,等待 trip.signal() 或 trip.signalAll() 方法唤醒

await(long, TimeUnit)

在 await() 上增加两个参数,等待超时时间 timeout,单位为 unit

breakBarrier()

放开屏障,设置标志,唤醒被屏障阻塞的线程

isBroken()

阻塞的线程是否被中断

reset()

重置 CyclicBarrier 对象

getNumberWaiting()

当前被阻塞线程的数量

getParties()

到达屏障的线程总数量,即创建时指定的数量

使用 CyclicBarrier 实现上面汇总:

package top.ytao.demo.thread.count;

import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

/**
 * https://ytao.top
 *
 * Created by YangTao on 2020/5/17 0017.
 */
public class CyclicBarrierTest {

    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {

        Map<String, Integer> map = new ConcurrentHashMap<>();

        CyclicBarrier barrier = new CyclicBarrier(2, new Thread(()->{
            // 所有线程到达屏障后,需要执行的代码
            System.out.println(map.get("thread1") + map.get("thread2"));
            System.out.println("CyclicBarrier end.... ");
        }));

        Thread thread1 = new Thread(() -> {
            map.put("thread1", 1);
            System.out.println("run thread1");
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        });

        Thread thread2 = new Thread(() -> {
            map.put("thread2", 2);
            System.out.println("run thread2");
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        });

        thread1.start();
        thread2.start();

    }
}

执行结果:

执行完两条子线程,并且在子线程里调用 barrier.await()后,屏障被打开,最后执行 CyclicBarrier 的最后的代码逻辑。

通过上面 CyclicBarrier 的方法可知,CyclicBarrier 比 CountDownLatch 使用更加灵活,CyclicBarrier 的 reset() 方法可以重置计数器,而 CountDownLatch 则只能使用一次。同时,CyclicBarrier 拥有更多线程阻塞信息的方法提供使用,在使用过程中,提供更加灵活的使用方式。

总结

上面三种方式,均由 JDK 的并发包中提供的工具。在多线程协作任务中,对计数器场景问题的解决方案,实现 main 线程对 worker 线程的等待完成。在实际开发应用中,使用频率也是非常之高。

关注【ytao】,更多原创好文

本文分享自微信公众号 - ytao(ytao-blog),作者:ytao

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-05-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java 线程基础,从这篇开始

    一个独立运行的程序是一个进程,一个进程中可以包含一个或多个线程,每个线程都有属于自己的一些属性,如堆栈,计数器等等。同时,一个线程在一个时间点上只能运行在一个 ...

    ytao
  • volatile 手摸手带你解析

    volatile 是 Java 里的一个重要的指令,它是由 Java 虚拟机里提供的一个轻量级的同步机制。一个共享变量声明为 volatile 后,特别是在多线...

    ytao
  • Java 线程通信之 wait/notify 机制

    Java 线程通信是将多个独立的线程个体进行关联处理,使得线程与线程之间能进行相互通信。比如线程 A 修改了对象的值,然后通知给线程 B,使线程 B 能够知道线...

    ytao
  • ThreadPoolExecutor源码分析

    ThreadPoolExecutor继承AbstractExecutorService,层级实现了ExecutorService,ExecutorService...

    冰枫
  • java 线程 Thread 使用介绍,包含wait(),notifyAll() 等函数使用介绍

    (原创,转载请说明出处!谢谢--https://cloud.tencent.com/developer/user/1148436/activities)  此文...

    林冠宏-指尖下的幽灵
  • 【Android面试】关于多线程,你必须知道的那些玩意儿

    进程和线程作为必知必会的知识,想来读者们也都是耳熟能详了,但真的是这样嘛?今天我们就来重新捋一捋,看看有没有什么知识点欠缺的。

    Android技术干货分享
  • 关于多线程,你必须知道的那些玩意儿

    进程和线程作为必知必会的知识,想来读者们也都是耳熟能详了,但真的是这样嘛?今天我们就来重新捋一捋,看看有没有什么知识点欠缺的。

    字节流动
  • 关于多线程,你必须知道的那些玩意儿

    进程和线程作为必知必会的知识,想来读者们也都是耳熟能详了,但真的是这样嘛?今天我们就来重新捋一捋,看看有没有什么知识点欠缺的。

    ClericYi
  • 【JMeter-3】JMeter参数化4种实现方式

    什么是参数化?从字面上去理解的话,就是事先准备好数据(广义上来说,可以是具体的数据值,也可以是数据生成规则),而非在脚本中写死,脚本执行时从准备好的数据中取值。

    云深i不知处
  • java基础thread——多线程的纷争(循序渐进)

    正在运行的程序,是系统进行资源分配和调用的独立单位。 每一个进程都有它自己的内存空间和系统资源。

    100000860378

扫码关注云+社区

领取腾讯云代金券