前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >CyclicBarrier和Condtion

CyclicBarrier和Condtion

作者头像
DioxideCN
发布2023-01-12 11:36:00
2990
发布2023-01-12 11:36:00
举报

深入浅出CyclicBarrier

CyclicBarrier的基本使用

基本概念

CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程达到一个屏障(也可以叫同步点)时被阻塞,知道最后一个线程打到屏障时,屏障才会开门,所有被屏障拦截的线程才会继续工作。

数据导入案例

代码语言:javascript
复制
public class Main extends Thread {
    @Override
    public void run() {
        System.out.println("开始执行数据汇总操作");
    }
	
    public static void main(String[] args) {
	    // parties: 3 必须要有 3 个线程参与进来才能继续
        CyclicBarrier cb = new CyclicBarrier(3, new Main());
        new CyclicBarrierDemo("/src/store/", cb).start();
        new CyclicBarrierDemo("/src/image/", cb).start();
        // 如果不满足 3 个则 cb 会一直阻塞直到第三个加入
        new CyclicBarrierDemo("/src/frame/", cb).start();
        // TODO 希望三个线程执行结束之后,再做一个汇总处理
    }
}
  
class CyclicBarrierDemo extends Thread {
    String path;  
    CyclicBarrier cyclicBarrier;  
	
    public CyclicBarrierDemo(String path, CyclicBarrier cyclicBarrier) {  
        this.path = path;  
        this.cyclicBarrier = cyclicBarrier;  
    }  
	
    @Override  
    public void run() {  
        System.out.println("开始导入数据: " + path);  
        try {  
            cyclicBarrier.await(); // 阻塞  
        } catch (InterruptedException | BrokenBarrierException e) {  
            throw new RuntimeException(e);  
        }  
    }  
}

得到如下的运行结果:

代码语言:javascript
复制
开始导入数据: /src/frame/
开始导入数据: /src/store/
开始导入数据: /src/image/
开始执行数据汇总操作

使用场景

CyclicBarrier 会在前提任务完成后再向下执行,所以当需要所有的子线程完成任务再执行主线程时,就可以选择使用 CyclicBarrier。例如:需要成功获取到所有图片,再进行展示;需要成功加载所有文件,再进行文件内容分析等。CyclicBarrier 具有以下性质:

  1. 如果指定了 parties 但又没有足够的线程来调用 await() 那就会导致所有线程都被阻塞住
  2. await(timeout, unit) 为了避免出现所有线程阻塞可以对 await 设置一个超时等待时间
  3. 同样可以使用 reset() 来重置计数,但会抛出 BrokenBarrierException 的异常

CyclicBarrier的原理分析

原理图

28e82e8f4934fd975ff16315967d12d6
28e82e8f4934fd975ff16315967d12d6

await()方法

代码语言:javascript
复制
private final Runnable barrierCommand; // 传入的第二个线程参数
private int count; // 对 parties 进行备份操作

private int dowait(boolean timed, long nanos)  
    throws InterruptedException, BrokenBarrierException,  
           TimeoutException {  
    final ReentrantLock lock = this.lock; // 获得一个重入锁 
    lock.lock();
    try {
	    // 构造一个周期
        final Generation g = generation;
		// 判断栅栏是否被打破
        if (g.broken)
            throw new BrokenBarrierException();
		// 判断线程是否被中断
        if (Thread.interrupted()) {
            breakBarrier(); // 将栅栏打破并抛出异常
            throw new InterruptedException();
        }
        int index = --count; // parties 副本进行递减
        if (index == 0) { // tripped
            Runnable command = barrierCommand;
            if (command != null) {  
                try {
	                // 执行构造时传递过来的第二个参数的run方法 线程对象
                    command.run();  
                } catch (Throwable ex) {  
                    breakBarrier();  
                    throw ex;  
                }  
            }
            // 进入下一个周期来唤醒所有线程 notifyAll()
            // 并恢复 generation 和 count
            nextGeneration();  
            return 0;  
        }  
		
        for (;;) {
            try {
                if (!timed)
	            // 不带有超时的阻塞
                    trip.await(); // condition 条件阻塞
                else if (nanos > 0L)
                // await 带有超时的阻塞
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
	            // 线程被中断且在同一个周期内还未被打破栅栏
                if (g == generation && ! g.broken) {
                    breakBarrier(); // 打破栅栏并返回异常
                    throw ie;
                } else {
                    Thread.currentThread().interrupt(); // 否则继续中断
                }
            }
			// broken 状态抛出异常
            if (g.broken)  
                throw new BrokenBarrierException();  
			// 不是同一个周期内返回周期
            if (g != generation)  
                return index;  
			// 超时抛出 toe 异常
            if (timed && nanos <= 0L) {
                breakBarrier(); // 打破栅栏
                throw new TimeoutException();  
            }  
        }  
    } finally {
        lock.unlock(); // 释放锁
    }  
}

public int await() throws InterruptedException, BrokenBarrierException {  
    try {  
        return dowait(false, 0L);  
    } catch (TimeoutException toe) {  
        throw new Error(toe); // cannot happen  
    }  
}

CyclicBarrier 需要两个参数,第一个参数是 parties 是参与循环屏障的线程个数,第二个参数是 barrierCommand 是参与被屏障的线程实例。CyclicBarrier 的作用就是若要执行 barrierCommand.run() 则必须先完成 parties 个线程的任务,主要实现了一种设定前提条件的作用。

深入浅出Condition

基本概念

Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。

启动和唤醒案例

代码语言:javascript
复制
public class ConditionDemo {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        ConditionDemoWait cdw = new ConditionDemoWait(lock, condition);
        ConditionDemoNotify cdn = new ConditionDemoNotify(lock, condition);
        new Thread(cdw).start();
        new Thread(cdn).start();
    }
}

record ConditionDemoWait(Lock lock, Condition condition) implements Runnable {
    @Override
    public void run() {
        System.out.println("begin - ConditionDemoWait");
        try {
            lock.lock();
            condition.await();
            System.out.println("end - ConditionDemoWait");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }
}

record ConditionDemoNotify(Lock lock, Condition condition) implements Runnable {
    @Override
    public void run() {
        System.out.println("begin - ConditionDemoNotify");
        try {
            lock.lock();
            condition.signal(); // 条件释放
            System.out.println("end - ConditionDemoNotify");
        } finally {
            lock.unlock();
        }
    }
}

得到如下的运行结果:

代码语言:javascript
复制
begin - ConditionDemoWait
begin - ConditionDemoNotify
end - ConditionDemoNotify
end - ConditionDemoWait

原理分析

await()方法

代码语言:javascript
复制
// 条件类型节点
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
	// 指向下一个 Node
    ConditionNode nextWaiter;
	
    public final boolean isReleasable() {  
        return status <= 1 || Thread.currentThread().isInterrupted();  
    }  
	
    public final boolean block() {  
        while (!isReleasable()) LockSupport.park();  
        return true;  
    }  
}

// ConditionObject 是 AQS 的一个内部类实现了 Condition
public class ConditionObject implements Condition, java.io.Serializable {
	// Condition 的本质是一个单向链表只有 next 没有 prev
	private transient ConditionNode firstWaiter; 
	private transient ConditionNode lastWaiter;
	
	private int enableWait(ConditionNode node) {
	    if (isHeldExclusively()) {
	        node.waiter = Thread.currentThread();
	        node.setStatusRelaxed(COND | WAITING);
	        ConditionNode last = lastWaiter;
	        if (last == null)
	            firstWaiter = node;
	        else
	            last.nextWaiter = node;
	        lastWaiter = node;
	        int savedState = getState();
	        // 尝试释放锁 简介调用重入锁的 tryRelease()
	        if (release(savedState))
		        // 返回 state
	            return savedState;
	    }
	    node.status = CANCELLED;
	    throw new IllegalMonitorStateException();
	}
	// 阻塞使得后续的方法无法运行
	public final void await() throws InterruptedException {  
	    if (Thread.interrupted())  
	        throw new InterruptedException();
	    // 创建一个 Node 并把当前的线程放进去
	    ConditionNode node = new ConditionNode();
	    int savedState = enableWait(node);
	    LockSupport.setCurrentBlocker(this);
	    boolean interrupted = false, cancelled = false, rejected = false;
	    // 判断节点是否在 AQS 队列上
	    while (!canReacquire(node)) {
	        if (interrupted |= Thread.interrupted()) {
	            if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
	                break;
	        } else if ((node.status & COND) != 0) {
	            try {
	                if (rejected)
	                    node.block();
	                else
	                    ForkJoinPool.managedBlock(node);
	            } catch (RejectedExecutionException ex) {
	                rejected = true;
	            } catch (InterruptedException ie) {
	                interrupted = true;
	            }
	        } else
	            Thread.onSpinWait()
	    }
	    LockSupport.setCurrentBlocker(null);
	    node.clearStatus();
	    acquire(node, savedState, false, false, false, 0L);
	    if (interrupted) {
	        if (cancelled) {
	            unlinkCancelledWaiters(node);
	            throw new InterruptedException();
	        }
	        Thread.currentThread().interrupt();
	    }
	}
}

signal()方法

代码语言:javascript
复制
public class ConditionObject implements Condition, java.io.Serializable {
	public final void signal() {  
	    ConditionNode first = firstWaiter;  
	    if (!isHeldExclusively())  
	        throw new IllegalMonitorStateException();  
	    if (first != null)
		    // 调用 doSignal()
	        doSignal(first, false);  
	}
	
	private void doSignal(ConditionNode first, boolean all) {  
		while (first != null) {  
			ConditionNode next = first.nextWaiter;
			// 说明 condition 节点已经没有了 下一个节点需要被释放掉
			if ((firstWaiter = next) == null)  
				lastWaiter = null;
			// CAS 替换
			if ((first.getAndUnsetStatus(COND) & COND) != 0) {
				// 转移 AQS 队列
				enqueue(first);  
				if (!all)  
					break;  
			}  
			first = next;  
		}  
	}
}

流程图

bc99b6cb803b51ce0a94cb847058d7ef
bc99b6cb803b51ce0a94cb847058d7ef

Condition 实质上就是两个队列不断地挂起、转移,且必须是基于 AQS 通信管道进行通信的。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-01-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 深入浅出CyclicBarrier
    • CyclicBarrier的基本使用
      • 基本概念
      • 数据导入案例
      • 使用场景
    • CyclicBarrier的原理分析
      • 原理图
      • await()方法
  • 深入浅出Condition
    • 基本概念
      • 启动和唤醒案例
        • 原理分析
          • await()方法
          • signal()方法
          • 流程图
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档