前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java多线程:条件变量

Java多线程:条件变量

原创
作者头像
冰寒火
修改2023-02-18 20:09:36
7690
修改2023-02-18 20:09:36
举报
文章被收录于专栏:软件设计软件设计

一、概览

条件变量将因不同条件而无法推进的线程分别阻塞在不同的条件队列上,可以精细控制线程同步,降低惊群效应。

1 类图

类图
类图

2 数据结构

代码语言:java
复制
//ExclusiveNode、SharedNode、ConditionNode都继承此类
abstract static class Node {
    volatile Node prev;       // initially attached via casTail
    volatile Node next;       // visibly nonnull when signallable
    Thread waiter;            // visibly nonnull when enqueued
    volatile int status;      // written by owner, atomic bit ops by others
}

static final class ConditionNode extends Node
    implements ForkJoinPool.ManagedBlocker {
    ConditionNode nextWaiter;            // link to next waiting node
}
//条件队列
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient ConditionNode firstWaiter;
    /** Last node of condition queue. */
    private transient ConditionNode lastWaiter;
}

线程执行await后就会进入条件队列,等被唤醒时重新进入同步队列。

二、signal流程

signal会唤醒条件队列上的首个线程,而signalAll会唤醒全部线程,唤醒流程如下:

  1. 拿到firstWaiter,取消COND标志,并将node从条件队列上移除;
  2. 将node转入到同步队列,并调用LockSupport唤醒线程;
代码语言:java
复制
public final void signal() {
    ConditionNode first = firstWaiter;
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    if (first != null)
        doSignal(first, false);
}
public final void signalAll() {
        ConditionNode first = firstWaiter;
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        if (first != null)
            doSignal(first, true);
    }
//AQS
private void doSignal(ConditionNode first, boolean all) {
    while (first != null) {
        ConditionNode next = first.nextWaiter;
        if ((firstWaiter = next) == null)
            lastWaiter = null;
        if ((first.getAndUnsetStatus(COND) & COND) != 0) {//取消COND状态
            enqueue(first);//转入同步队列
            if (!all)
                break;
        }
        first = next;
    }
}

final void enqueue(Node node) {
    if (node != null) {
        for (;;) {
            Node t = tail;
            node.setPrevRelaxed(t);        // avoid unnecessary fence
            if (t == null)                 // initialize
                tryInitializeHead();
            else if (casTail(t, node)) {
                t.next = node;
                if (t.status < 0)          // wake up to clean link
                    LockSupport.unpark(node.waiter);
                break;
            }
        }
    }
}

三、await流程

await流程如下:

  1. 创建ConditionNode,并保存AQS的status值,然后释放掉锁,再将ConditionNode加入条件队列;
  2. 进入while循环,ForkJoinPool.managedBlock(node)最终会调用LockSupport.park阻塞线程;
  3. 当本线程被signal唤醒时,node已加入到同步队列,canReacquire返回true,跳出循环;
  4. 再次调用AQS.acquire获取锁,以原来的savedState设置AQS的status。
代码语言:java
复制
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    ConditionNode node = new ConditionNode();
    int savedState = enableWait(node);//加入条件队列
    LockSupport.setCurrentBlocker(this); // for back-compatibility,将AQS对象设置到thread中
    boolean interrupted = false, cancelled = false, rejected = false;
    while (!canReacquire(node)) {//如果被唤醒进入同步队列后就可以跳出循环
        if (interrupted |= Thread.interrupted()) {
            if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                break;              // else interrupted after signal
        } else if ((node.status & COND) != 0) {
            try {
                if (rejected)
                    node.block();
                else
                    ForkJoinPool.managedBlock(node);//阻塞线程,最终会调用LockSupport.park()
            } catch (RejectedExecutionException ex) {
                rejected = true;
            } catch (InterruptedException ie) {
                interrupted = true;
            }
        } else
            Thread.onSpinWait();    // awoke while enqueuing
    }
	//被唤醒
    LockSupport.setCurrentBlocker(null);
    node.clearStatus();//
	//lock.lock()方法:acquire(null, arg, false, false, false, 0L);
	//重新获取锁时已原来的savedState
    acquire(node, savedState, false, false, false, 0L);//重新获取锁,此时该节点已经进入了同步队列,有可能直接tryAcquire成功跳出循环,也可能需要两次循环修改node.status为WAITING、park。
    if (interrupted) {
        if (cancelled) {
            unlinkCancelledWaiters(node);
            throw new InterruptedException();
        }
        Thread.currentThread().interrupt();
    }
}

enableWait方法需要保存线程此时现场状态用于将来恢复,加入条件队列并释放锁。

代码语言:java
复制
/**
 * Adds node to condition list and releases lock.
 *
 * @param node the node
 * @return savedState to reacquire after wait
 */
private int enableWait(ConditionNode node) {
    if (isHeldExclusively()) {//Sync 判断是否是持有锁的线程
        node.waiter = Thread.currentThread();
        node.setStatusRelaxed(COND | WAITING);//设置标志
        //加入条件队列
        ConditionNode last = lastWaiter;
        if (last == null)
            firstWaiter = node;
        else
            last.nextWaiter = node;
        lastWaiter = node;
        //缓存状态用于恢复
        int savedState = getState();
        if (release(savedState))//AQS.release释放锁
            return savedState;
    }
    node.status = CANCELLED; // lock not held or inconsistent
    throw new IllegalMonitorStateException();
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概览
    • 1 类图
      • 2 数据结构
      • 二、signal流程
      • 三、await流程
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档