前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[Java] CountDownLatch 与 CyclicBarrier

[Java] CountDownLatch 与 CyclicBarrier

作者头像
wOw
发布2018-09-18 14:55:45
3940
发布2018-09-18 14:55:45
举报
文章被收录于专栏:wOw的Android小站wOw的Android小站

CountDownLatch

CountDownLatch是什么

源码注释描述如下:

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

它是一个同步工具类,允许一个或多个线程一直等待,直到其他线程运行完成后再执行。

看下我遇到的代码:

代码语言:javascript
复制
final class DecodeThread extends Thread {

    private final MainActivity mActivity;
    private final CountDownLatch mHandlerInitLatch;
    private Handler mHandler;

    DecodeThread(MainActivity activity) {
        this.mActivity = activity;
        mHandlerInitLatch = new CountDownLatch(1);
    }

    Handler getHandler() {
        try {
            mHandlerInitLatch.await();
        } catch (InterruptedException ie) {
            // continue?
        }
        return mHandler;
    }

    @Override
    public void run() {
        Looper.prepare();
        mHandler = new DecodeHandler(mActivity);
        mHandlerInitLatch.countDown();
        Looper.loop();
    }
}

CountDownLatch 工作原理

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量(上面代码构造线程数为1)。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

CountDownLatch的构造函数中的count就是闭锁需要等待的线程数量。这个值只能被设置一次,而且不能重新设置。

主线程必须在启动其他线程后立即调用CountDownLatch.await方法,这样主线程就会在这个方法上阻塞,知道其他线程完成各自任务。

其他线程完成任务后必须各自通知CountDownLatch对象,使其调用countDown方法。当count值为0时,主线程就能通过await方法恢复自己的任务。

简述其执行流程:

  1. 运行主线程
  2. 创建N个线程的CountDownLatch
  3. 创建启动N个线程
  4. 主线程运行CountDownLatch.await(),等待latch
  5. N个线程运行完毕,latch计数减到0
  6. 主线程恢复运行

再看上面的代码,主线程在getHandler处等待latch,在run处创建handler后执行latch.countDown,就是为了在get的时候能拿到非空的handler。

使用场景

  1. 实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。
  2. 开始执行前等待n个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了。
  3. 死锁检测:一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。

例子

网上看的一个例子

代码语言:javascript
复制
// 基础checker类
import java.util.concurrent.CountDownLatch;

public abstract class BaseHealthChecker implements Runnable {

	private CountDownLatch _latch;
	private String _serviceName;
	private boolean _serviceUp;

	// Get latch object in constructor so that after completing the task, thread
	// can countDown() the latch
	public BaseHealthChecker(String serviceName, CountDownLatch latch) {
		// TODO Auto-generated constructor stub
		super();
		this._latch = latch;
		this._serviceName = serviceName;
		this._serviceUp = false;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		try {
			verifyService();
			_serviceUp = true;
		} catch (Throwable t) {
			// TODO: handle exception
			t.printStackTrace(System.err);
			_serviceUp = false;
		} finally {
			if (_latch != null) {
				_latch.countDown();
			}
		}
	}

	public String getServiceName() {
		return _serviceName;
	}

	public boolean isServiceUp() {
		return _serviceUp;
	}

	// This methos needs to be implemented by all specific service checker
	public abstract void verifyService();
}

然后创建三个checker,分别是CacheChecker,DatabaseChecker和NetworkChecker

代码语言:javascript
复制
import java.util.concurrent.CountDownLatch;


public class CacheHealthChecker extends BaseHealthChecker {
	
    public CacheHealthChecker (CountDownLatch latch)  {
        super("Cache Service", latch);
    }
 
    @Override
    public void verifyService()
    {
        System.out.println("Checking " + this.getServiceName());
        try
        {
            Thread.sleep(6000);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        System.out.println(this.getServiceName() + " is UP");
    }
}

创建一个启动类,处理checker的检测

代码语言:javascript
复制
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;


public class ApplicationStartupUtil {
	//List of service checkers
    private static List<BaseHealthChecker> _services;
 
    //This latch will be used to wait on
    private static CountDownLatch _latch;
 
    private ApplicationStartupUtil()
    {
    }
 
    private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();
 
    public static ApplicationStartupUtil getInstance()
    {
        return INSTANCE;
    }
 
    public static boolean checkExternalServices() throws Exception
    {
        //Initialize the latch with number of service checkers
        _latch = new CountDownLatch(3);
 
        //All add checker in lists
        _services = new ArrayList<BaseHealthChecker>();
        _services.add(new NetworkHealthChecker(_latch));
        _services.add(new CacheHealthChecker(_latch));
        _services.add(new DatabaseHealthChecker(_latch));
 
        //Start service checkers using executor framework
        Executor executor = Executors.newFixedThreadPool(_services.size());
 
        for(final BaseHealthChecker v : _services)
        {
            executor.execute(v);
        }
 
        //Now wait till all services are checked
        _latch.await();
 
        //Services are file and now proceed startup
        for(final BaseHealthChecker v : _services)
        {
            if( ! v.isServiceUp())
            {
                return false;
            }
        }
        return true;
    }
}

在主程序中运行检测

代码语言:javascript
复制
public class main {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		boolean result = false;
        try {
            result = ApplicationStartupUtil.checkExternalServices();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("External services validation completed !! Result was :: "+ result);

	}
}

运行结果:

Checking Network Service Checking Cache Service Checking Database Service Database Service is UP Cache Service is UP Network Service is UP External services validation completed !! Result was :: true

CyclicBarrier

CyclicBarrier 定义

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

也是一个同步工具类,它是让一组线程相互等待进入barrier状态,然后这组线程再执行。

初看定义可能有些懵,看个示例代码就清楚了。

示例

代码语言:javascript
复制
public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }
}

示例中创建了监控4个线程的CyclicBarrier(4),然后启动4个线程,每个线程都持有cyclicBarrier对象。

在每个线程执行一半时,运行cyclicBarrier.await(),此时CyclicBarrier就会进行+1计数,当前线程被阻塞。当计数达到4时,解除阻塞,所有线程都继续执行。

所以运行结果是:

线程Thread-0正在写入数据… 线程Thread-3正在写入数据… 线程Thread-2正在写入数据… 线程Thread-1正在写入数据… 线程Thread-2写入数据完毕,等待其他线程写入完毕 线程Thread-0写入数据完毕,等待其他线程写入完毕 线程Thread-3写入数据完毕,等待其他线程写入完毕 线程Thread-1写入数据完毕,等待其他线程写入完毕 所有线程写入完毕,继续处理其他任务… 所有线程写入完毕,继续处理其他任务… 所有线程写入完毕,继续处理其他任务… 所有线程写入完毕,继续处理其他任务…

要点

  • 构造方法有两种

CyclicBarrier(int parties) 默认构造方法,参数表示拦截的线程数量。

CyclicBarrier(int parties, Runnable barrierAction) 由于线程之前的调度是由CPU决定的,所以默认的构造方法无法设置线程执行优先级,CyclicBarrier提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达同步点时,优先执行线程barrierAction,这样可以更加方便的处理一些负责的业务场景。

  • await实现

CyclicBarrier同样提供带超时时间的await和不带超时时间的await。如果指定了时间,在时间内某个线程还未await,就抛出异常,所有线程继续执行后续任务。

  • reset功能

reset可以使其不断地复用

两者区别

CountDownLatch

CyclicBarrier

减计数方式

加计数方式

计算为0时释放所有等待的线程

计数达到指定值时释放所有等待线程

计数为0时,无法重置

计数达到指定值时,计数置为0重新开始

调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响

调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞

不可重复利用

可重复利用

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-05-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CountDownLatch
    • CountDownLatch是什么
      • CountDownLatch 工作原理
        • 使用场景
          • 例子
          • CyclicBarrier
            • CyclicBarrier 定义
              • 示例
                • 要点
                • 两者区别
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档