专栏首页wOw的Android小站[Java] CountDownLatch 与 CyclicBarrier

[Java] CountDownLatch 与 CyclicBarrier

CountDownLatch

CountDownLatch是什么

源码注释描述如下:

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

它是一个同步工具类,允许一个或多个线程一直等待,直到其他线程运行完成后再执行。

看下我遇到的代码:

final class DecodeThread extends Thread {

    private final MainActivity mActivity;
    private final CountDownLatch mHandlerInitLatch;
    private Handler mHandler;

    DecodeThread(MainActivity activity) {
        this.mActivity = activity;
        mHandlerInitLatch = new CountDownLatch(1);
    }

    Handler getHandler() {
        try {
            mHandlerInitLatch.await();
        } catch (InterruptedException ie) {
            // continue?
        }
        return mHandler;
    }

    @Override
    public void run() {
        Looper.prepare();
        mHandler = new DecodeHandler(mActivity);
        mHandlerInitLatch.countDown();
        Looper.loop();
    }
}

CountDownLatch 工作原理

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量(上面代码构造线程数为1)。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

CountDownLatch的构造函数中的count就是闭锁需要等待的线程数量。这个值只能被设置一次,而且不能重新设置。

主线程必须在启动其他线程后立即调用CountDownLatch.await方法,这样主线程就会在这个方法上阻塞,知道其他线程完成各自任务。

其他线程完成任务后必须各自通知CountDownLatch对象,使其调用countDown方法。当count值为0时,主线程就能通过await方法恢复自己的任务。

简述其执行流程:

  1. 运行主线程
  2. 创建N个线程的CountDownLatch
  3. 创建启动N个线程
  4. 主线程运行CountDownLatch.await(),等待latch
  5. N个线程运行完毕,latch计数减到0
  6. 主线程恢复运行

再看上面的代码,主线程在getHandler处等待latch,在run处创建handler后执行latch.countDown,就是为了在get的时候能拿到非空的handler。

使用场景

  1. 实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。
  2. 开始执行前等待n个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了。
  3. 死锁检测:一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。

例子

网上看的一个例子

// 基础checker类
import java.util.concurrent.CountDownLatch;

public abstract class BaseHealthChecker implements Runnable {

	private CountDownLatch _latch;
	private String _serviceName;
	private boolean _serviceUp;

	// Get latch object in constructor so that after completing the task, thread
	// can countDown() the latch
	public BaseHealthChecker(String serviceName, CountDownLatch latch) {
		// TODO Auto-generated constructor stub
		super();
		this._latch = latch;
		this._serviceName = serviceName;
		this._serviceUp = false;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		try {
			verifyService();
			_serviceUp = true;
		} catch (Throwable t) {
			// TODO: handle exception
			t.printStackTrace(System.err);
			_serviceUp = false;
		} finally {
			if (_latch != null) {
				_latch.countDown();
			}
		}
	}

	public String getServiceName() {
		return _serviceName;
	}

	public boolean isServiceUp() {
		return _serviceUp;
	}

	// This methos needs to be implemented by all specific service checker
	public abstract void verifyService();
}

然后创建三个checker,分别是CacheChecker,DatabaseChecker和NetworkChecker

import java.util.concurrent.CountDownLatch;


public class CacheHealthChecker extends BaseHealthChecker {
	
    public CacheHealthChecker (CountDownLatch latch)  {
        super("Cache Service", latch);
    }
 
    @Override
    public void verifyService()
    {
        System.out.println("Checking " + this.getServiceName());
        try
        {
            Thread.sleep(6000);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        System.out.println(this.getServiceName() + " is UP");
    }
}

创建一个启动类,处理checker的检测

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;


public class ApplicationStartupUtil {
	//List of service checkers
    private static List<BaseHealthChecker> _services;
 
    //This latch will be used to wait on
    private static CountDownLatch _latch;
 
    private ApplicationStartupUtil()
    {
    }
 
    private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();
 
    public static ApplicationStartupUtil getInstance()
    {
        return INSTANCE;
    }
 
    public static boolean checkExternalServices() throws Exception
    {
        //Initialize the latch with number of service checkers
        _latch = new CountDownLatch(3);
 
        //All add checker in lists
        _services = new ArrayList<BaseHealthChecker>();
        _services.add(new NetworkHealthChecker(_latch));
        _services.add(new CacheHealthChecker(_latch));
        _services.add(new DatabaseHealthChecker(_latch));
 
        //Start service checkers using executor framework
        Executor executor = Executors.newFixedThreadPool(_services.size());
 
        for(final BaseHealthChecker v : _services)
        {
            executor.execute(v);
        }
 
        //Now wait till all services are checked
        _latch.await();
 
        //Services are file and now proceed startup
        for(final BaseHealthChecker v : _services)
        {
            if( ! v.isServiceUp())
            {
                return false;
            }
        }
        return true;
    }
}

在主程序中运行检测

public class main {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		boolean result = false;
        try {
            result = ApplicationStartupUtil.checkExternalServices();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("External services validation completed !! Result was :: "+ result);

	}
}

运行结果:

Checking Network Service Checking Cache Service Checking Database Service Database Service is UP Cache Service is UP Network Service is UP External services validation completed !! Result was :: true

CyclicBarrier

CyclicBarrier 定义

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

也是一个同步工具类,它是让一组线程相互等待进入barrier状态,然后这组线程再执行。

初看定义可能有些懵,看个示例代码就清楚了。

示例

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }
}

示例中创建了监控4个线程的CyclicBarrier(4),然后启动4个线程,每个线程都持有cyclicBarrier对象。

在每个线程执行一半时,运行cyclicBarrier.await(),此时CyclicBarrier就会进行+1计数,当前线程被阻塞。当计数达到4时,解除阻塞,所有线程都继续执行。

所以运行结果是:

线程Thread-0正在写入数据… 线程Thread-3正在写入数据… 线程Thread-2正在写入数据… 线程Thread-1正在写入数据… 线程Thread-2写入数据完毕,等待其他线程写入完毕 线程Thread-0写入数据完毕,等待其他线程写入完毕 线程Thread-3写入数据完毕,等待其他线程写入完毕 线程Thread-1写入数据完毕,等待其他线程写入完毕 所有线程写入完毕,继续处理其他任务… 所有线程写入完毕,继续处理其他任务… 所有线程写入完毕,继续处理其他任务… 所有线程写入完毕,继续处理其他任务…

要点

  • 构造方法有两种

CyclicBarrier(int parties) 默认构造方法,参数表示拦截的线程数量。

CyclicBarrier(int parties, Runnable barrierAction) 由于线程之前的调度是由CPU决定的,所以默认的构造方法无法设置线程执行优先级,CyclicBarrier提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达同步点时,优先执行线程barrierAction,这样可以更加方便的处理一些负责的业务场景。

  • await实现

CyclicBarrier同样提供带超时时间的await和不带超时时间的await。如果指定了时间,在时间内某个线程还未await,就抛出异常,所有线程继续执行后续任务。

  • reset功能

reset可以使其不断地复用

两者区别

CountDownLatch

CyclicBarrier

减计数方式

加计数方式

计算为0时释放所有等待的线程

计数达到指定值时释放所有等待线程

计数为0时,无法重置

计数达到指定值时,计数置为0重新开始

调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响

调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞

不可重复利用

可重复利用

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • [Objective-C]多线程和GCD

    是指在系统中正在运行的一个应用程序。 每个进程之间是独立的,每个进程均运行在其专用且受保护的内存空间内。 比如同时打开QQ、Xcode,系统就会分别启动两个进程...

    wOw
  • [Protocol Buffer]Java使用Protocol Buffer

    下面一个例子是一个简单的通讯录,可以读写人物信息到文件。每个人都有姓名,ID,email和电话号码。

    wOw
  • [设计模式]之十三:抽象工厂模式

    抽象工厂模式与工厂方法模式的最大区别就在于,工厂方法模式针对的是一个产品等级结构;而抽象工厂模式则需要面对多个产品等级结构。

    wOw
  • 基于Zmq的后台通信模型介绍

    Zmq是一个简单好用的传输组建,使得socket变成更加简洁、高效、高性能。本文主要介绍后台服务实现、多线程任务实现、线程无锁计数实现。

    airingzeng
  • IOCP异步优化

    2. IO操作: CPU会把内存中的程序委托给其他的网络、磁盘等驱动程序,让这些外部的驱动程序来进行具体的处理,处理完成以后再返回给内存程序。对于这两类操作的优...

    小蜜蜂
  • 【转】Java并发的AQS原理详解

    每一个 Java 的高级程序员在体验过多线程程序开发之后,都需要问自己一个问题,Java 内置的锁是如何实现的?最常用的最简单的锁要数 ReentrantL...

    一枝花算不算浪漫
  • C# 基础知识系列- 12 任务和多线程

    照例一份前言,在介绍任务和多线程之前,先介绍一下异步和同步的概念。我们之间介绍的知识点都是在同步执行,所谓的同步就是一行代码一行代码的执行,就像是我们日常乘坐地...

    程序员小高
  • 一篇搞懂线程池

    在上一篇文章《spring boot使用@Async异步任务》中我们了解了使用@Async的异步任务使用,在这篇文章中我们将学习使用线程池来创建异步任务的线程。

    小森啦啦啦
  • JAVA初级岗面试知识点——基础篇

    14、short s1 = 1; s1 = s1 + 1;有错吗?short s1 = 1; s1 += 1;有错吗?

    C you again 的博客
  • 打通 Java 任督二脉 —— 并发数据结构的基石

    每一个 Java 的高级程序员在体验过多线程程序开发之后,都需要问自己一个问题,Java 内置的锁是如何实现的?最常用的最简单的锁要数 ReentrantLoc...

    老钱

扫码关注云+社区

领取腾讯云代金券