专栏首页做不甩锅的后端多线程基础(十七):Condition及ConditionObjet源码分析

多线程基础(十七):Condition及ConditionObjet源码分析

1.Condition说明

在java中,为了配合ReentrantLock等Lock的实现类实现锁的多条件等待,为此java设计了Condition接口。在AQS中的主要结构如下:

ConditionObject是AQS的内部类,在使用的时候,通过newCondition方法创建Condition。

2 Condition接口

2.1 Condition注释

Condition提供了管程模型中,Object的wait,notify、notifyAll方法类似的功能,以不同的对象与任何Lock结合使用,每对象由多个等待的集合。如果用Lock替换了ynchronized的使用,则Condition将替换管程模型中Object的wait、notify、notifyAll的使用。 Condition也成为条件队列,条件变量,为一个线程终止执行等待,直到另外的线程达到某些触发条件而进行通知提供了一种手段,由于对该共享状态的信息访问发生在不同的线程中,因此必须对其进行保护,因此需要某种形式的锁与该条件相关联,等待条件提供的关键属性是原子的,会释放关联锁挂起当前线程,就像wait一样。 Condition实例本质上绑定到锁,要获取特定Lock实例的Condition实例,请使用newCondition。 例如,假如我们有一个有界缓冲区,它支持put和take方法,如果尝试在空缓冲区上执行take,则线程将阻塞,直到有可用项为止,如果在完整的缓冲区上尝试使用put,则线程将阻塞,直到有可用空间为止。我们希望在单独的等待集合中继续等待put线程和take线程,以便我们可以使用仅在缓冲区中的项目或空间可以通知单个线程的优化。这可以使用两个Condition来实现。

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

  final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     <b>lock.lock();
     try {</b>
       while (count == items.length)
        <b>notFull.await();</b>
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       <b>notEmpty.signal();</b>
     <b>} finally {
       lock.unlock();
     }</b>
   }

   public Object take() throws InterruptedException {
     <b>lock.lock();
     try {</b>
       while (count == 0)
         <b>notEmpty.await();</b>
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       <b>notFull.signal();</b>
       return x;
     <b>} finally {
       lock.unlock();
     }</b>
   }
 }

ArrayBlockingQueue提供了此类功能。因此我们没有必要再使用上述的方式来实现。 Condition实现了可以提供与Object监视方法不同的行为和语义。例如,对通知的保证顺序,或在执行通知的时候不需要锁定,如果实现提供了这周特殊的语义,则实现必须记录这些语义。 请注意,Condition实例只是普通对象,他们本身可以用作synhronized的目标,并且可以具有自己的管程,wait和notify/notifyAll方法。获取Condition实例的管程锁,或者使用其管程方法与获得该Condition关联的Lock或者使用其await方法进行等待没有任何关联关系。因此要避免await和signal与wait/notify混淆。尽量不要在synchronized中使用Condition。 除非另有说明,否则任何为空的参数都将导致NullPointerException。 实现注意事项: 等待Condition的时候,会出现虚假的唤醒,通常做为底层平台语义的让步而出现,这对大多数应用程序几乎没有实际影响,因为其应该始终在循环中等待Condition。以测试正在等待的状态。一个实现可以自由的消除虚假唤醒的可能性,但是建议应用的程序员始终假定他们会发生。因此总是在循环中等待。 条件等待的三种形式,可中断,不可中断,定时,在某些平台上的实现容易程度和性能特征可能有所不同。特别的是,可能难以提供这些功能并维护特定的语义。例如排序保证。此外中断线程的实际挂起的能力可能并不总在所有的平台上都可行。 因此,不需要实现为所有三种等待形式定义完全相同的语义保证,也不需要支持中断线程的实际挂起。 需要一个实现来清楚的记录每个等待提供的语义和保证,并且当一个实现确定支持红缎线程挂起时,它必须遵守此接口中定义的中断语句。 由于中断通常意味着取消,并且通常不进行中断检查,因此与正常返回方法相比,实现可能更喜欢中断进行响应。即使可以证明中断发生在另外一个可能已取消阻塞线程的操作之后。也是如此,实现应该记录此行为。

2.2 Condition的方法

2.2.1 await

 void await() throws InterruptedException;

说明: 使当前线程等待,直到收到中断信号。 与Condition关联的锁被原子的释放,并且出于线程调度的目的,当前线程被禁用,并且出于休眠状态,直到出现如下四种情况:

  • 其他一些线程为此Condition调用了signal方法,而当前线程恰好被选择为要唤醒的线程。
  • 其他一些线程为此Condition调用signalAll方法。
  • 当前线程有其他一些线程,Thread调用interrupt 。并且支持中断。
  • 假唤醒。

在上述所有情况下, 在此方法可以返回之前,当前线程必须重新获取与此条件关联的锁。当线程返回时,保证持有此锁。 如果当前线程: 在进入此方法时已设置其中断状态,或者是调用interrupt 之后正在等待中断线程。 然后将引发InterruptedException并清除当前线程的中断状态,在第一种情况下,没有规定在释放锁之前进行中断测试。 假定在调用此方法的时候当前线程持有此Condition关联的锁,具体取决于实现,如果是,则将引发IllegalMonitorStateException并且实现必须记录该事实。 与响应信号的正常方法返回相比,实现可能更喜欢响应中断,在那种情况下,实现必须确保将信号重定向到另外一个等待的线程(如果有)。

2.2.2 awaitUninterruptibly

void awaitUninterruptibly();

这个方法将导致当前线程等待,直到有其他线程发送信号。 该条件关联的锁,被原子释放,并且出于线程调度的目的,当前线程被禁用,并且出于休眠状态,直到以下三种情况之一产生:

  • 有其他的线程调用signal,恰好当前线程在Condition的等待队列中恰好被选中。
  • 有其他的线程调用ondition的signalAll方法。
  • 虚假的唤醒。

在所有情况下,此方法在返回当前线程之前必须重新获取与此条件相关的锁,当线程返回时它确保持有这个锁。 如果当前线程进入此方法时已设置其中的中断状态,或者在等待时,inturrupt,则它将继续等待,直到收到信号。当其最终从该方法返回的时候,其中断状态任然将被设置。 实现注意: 在调用此方法时,假定当前线程持有与此Condition关联的锁,由实现方来确认是否这种情况。如果不是,则如何确定,通常将引发IllegalMonitorStateException异常。并且实现必须记录。

2.2.3 awaitNanos

 long awaitNanos(long nanosTimeout) throws InterruptedException;

使当前线程等待,直到收到信号或中断,或指定的等待时间已超时。 这个方法将导致与该条件关联的锁被释放,并且由于线程池调度的目的,当前线程被禁用,并且出于休眠状态,直到如下五种情况之一发生:

  • 一些其他的线程调用了signal方法,恰好当前线程被选为唤醒的线程。
  • 一些其他的先调用了此条件变量的signalAll方法。
  • 一些其他的线程中断了当前线程,支持中断线程暂停。
  • 已超过指定的等待时间。
  • 假唤醒。

在所有情况下,在此方法可以返回之前,当前线程必须重新获取与此条件相关的锁。当线程返回时,保证持有此锁。 如果当前线程: 在进入该方法时已设置其中断状态,或者是interrupt ,但正在等待并支持中断线程。 然后将导致InterruptedException并清除当前线程的中断状态。在第一种情况下,没有规定在释放锁之前是否进行了中断测试。 给定返回时提供的nanosTimeout,该方法将返回等待的纳秒估计值,如果超时,则返回小于或者等于0的值,此值可用于确定 等待返回但仍不满足条件等待情况下是否重新等待,以及等待多长时间。 此方法的典型用法如下:

boolean aMethod(long timeout, TimeUnit unit) {
   long nanos = unit.toNanos(timeout);
   lock.lock();
   try {
     while (!conditionBeingWaitedFor()) {
       if (nanos <= 0L)
         return false;
       nanos = theCondition.awaitNanos(nanos);
     }
     // ...
   } finally {
     lock.unlock();
   }
}}

设计说明:此方法需要一个纳秒级的参数,以避免在报告剩余时间时出现截断错误。这样的精度损失将使程序员难以确保总的等待时间不会系统地短于重新等待发生时指定的时间。 实现注意事项: 当调用此方法时,假定当前线程有与此Condition关联的锁,由实施来确定是否是这种情况,如果不是,则如何确定。通常将引发IllegalMonitorStateException异常,并进行记录。 与正常方法返回响应信号相比,或者与指示经过指定的等待时间相比,实现可能更喜欢响应中断。无论哪种情况,实现都必须确保将信号重定向到另一个等待线程(如果有)。

2.2.4 await

 boolean await(long time, TimeUnit unit) throws InterruptedException;

此方法与awaitNanos等价,只是此处采用了时间单元。

2.2.5 awaitUntil

 boolean awaitUntil(Date deadline) throws InterruptedException;

此方法与awaitNanos以及await都类似,只是此处将等待时间改为了具体的Date。

2.2.6 signal

void signal();

此方法将唤醒一个等待线程。 如果有任何线程在这种情况下等待,则选择一个线程进行唤醒。然后,该线程必须重新获取锁,然后才能从{@code await}返回。 当调用此方法时,实现可能(并且通常确实)要求当前线程持有与此{@code Condition}关联的锁。实现必须记录此前提条件,以及如果未持有该锁,则应采取的任何措施。通常,将引发诸如{@link IllegalMonitorStateException}之类的异常。

2.2.7 signalAll

void signalAll();

此方法将唤醒全部等待线程。 如果有任何线程在这种情况下等待,那么它们都将被唤醒。每个线程必须重新获取锁,然后才能从{@code await}返回。 当调用此方法时,实现可能(并且通常确实)要求当前线程持有与此{@code Condition}关联的锁。实现必须记录此前提条件,以及如果未持有该锁,则应采取的任何措施。通常,将引发诸如{@link IllegalMonitorStateException}之类的异常。

3.AQS中的ConditionObject实现

我们来看看在AQS中ConditionObject是如何实现的。

3.1 类结构及成员变量

ConditionObject是AQS中对Condition的一个实现。 此类的方法文档从锁和条件用户的角度描述了其实现机制,而不是行为规范。此类的导出版本通常需要随描述条件语义 文档,这些条件语义依赖于关联的AbstractQueueSynchronizer语义。

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

    /**
     * Creates a new {@code ConditionObject} instance.
     */
    public ConditionObject() { }
}

可以看到,此类内部实际上只有两个变量,分别是指向waiter队列的首尾指针。

最终构成的数据结构如下图:

3.2 重要方法

3.2.1 await

实现了condition接口中的await方法。

public final void await() throws InterruptedException {
   //首先判断线程的中断状态,如果中断则抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
   //通过add方法新增一个Node节点
    Node node = addConditionWaiter();
    //对这个node节点调用release方法 这与对应的锁有关系,实际上Condition是配合锁使用,意思就是将Node添加到waiter队列,并释放锁
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    //判断Node节点是否出于等待Lock的队列中
    while (!isOnSyncQueue(node)) {
        //将当前线程休眠
        LockSupport.park(this);
        //检查中断状态是否为0,如果不为0则跳出循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //将该节点入队
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    //如果中断模式不为0 则中断之后再等待
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

3.2.2 awaitNanos

实现定时条件等待的方法。

public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    //首先判断中断状态
    if (Thread.interrupted())
        throw new InterruptedException();
    //同样,添加到条件队列
    Node node = addConditionWaiter();
    //释放锁
    long savedState = fullyRelease(node);
    //计算截至的时间
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    //判断后期是否存在其他节点
    while (!isOnSyncQueue(node)) {
        //如果此时时间不合法 则取消并返回
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        //如果时间合法,纳秒调用parkNanos将计算的时间用于park
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        //再次对中断状态进行检查
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        //计算超时的纳秒数,这为循环中下一次重新计算时间。
        nanosTimeout = deadline - System.nanoTime();
    }
    //获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    //如果中断再次出现,则中断并等待
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    //将没有花费完的时间返回
    return deadline - System.nanoTime();
}

3.2.3 awaitUntil

此方法实现定时的等待方法。

public final boolean awaitUntil(Date deadline)
        throws InterruptedException {
    //截至时间
    long abstime = deadline.getTime();
    //判断中断状态
    if (Thread.interrupted())
        throw new InterruptedException();
    //将节点添加到waiter队列
    Node node = addConditionWaiter();
    //释放锁
    long savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    //判断Node是否还有后续节点
    while (!isOnSyncQueue(node)) {
        //判断超时时间是否达到,如果达到则取消等待的任务,退出
        if (System.currentTimeMillis() > abstime) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        //反之将当前线程进行park,通过until的方法调用
        LockSupport.parkUntil(this, abstime);
        //检查中断状态
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //获得锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    //判断中断状态
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

3.2.4 await(long time, TimeUnit unit)

public final boolean await(long time, TimeUnit unit)
        throws InterruptedException {
    //计算纳秒
    long nanosTimeout = unit.toNanos(time);
    //判断中断状态
    if (Thread.interrupted())
        throw new InterruptedException();
    //添加到Waiter队列
    Node node = addConditionWaiter();
    //释放锁 即从锁的等待队列移除
    long savedState = fullyRelease(node);
    //计算deadline
    final long deadline = System.nanoTime() + nanosTimeout;
    boolean timedout = false;
    int interruptMode = 0;
    //判断是否存在后续节点
    while (!isOnSyncQueue(node)) {
       //如果时间失效,则取消等待
        if (nanosTimeout <= 0L) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        //如果时间有效 则通过parkNanos 将当前线程暂停
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        //检查中断状态
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    //获得锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    //判断中断状态
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

3.2.5 signal

发出信号,将一个队列中的等待的线程唤醒

public final void signal() {
    //返回调用的线程是否独占,这个是由lock的实现类实现的方法,也说明signal用于独占模式
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
        
    Node first = firstWaiter;
    //如果当前等待的waiter不为空 则执行
    if (first != null)
        //执行doSignal
        doSignal(first);
}

3.2.6 doSignal

实际上这个方法就是将链表中的第一个Node变成null。之后将链表后面Node前移。

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

3.2.7 signalAll

此方法的目的是发出信号,将队列中的全部等待Node都唤醒。

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //主要实现方法在doSignalAll
        doSignalAll(first);
}

3.2.8 doSignalAll

此方法将队列中的全部Node都清空,并挨个调用transferForSignal方法。

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

此方法会将Node的条件等待队列重新入队到锁的竞争队列。并执行unpark方法。

4 总结

如上是对Condition以及ConditionObject源码的一些分析,实际上可以看出,ConditionObject中的对于await的实现,恰恰是与获得锁的tryLock等方法是对立的操作。await方法将Node从Lock的等待队列移动到了Waiter队列,而signal/signalAll则与之相反,将其从Waiter队列移动到Lock的等待队列。 在队列中都会被park。之后移出队列的时候unpark用来竞争锁。如果竞争成功则拿到锁,从队列移除。否则再次会被park,这也就是前面说的假唤醒。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 多线程基础(十四):AbstractQueuedSynchronizer源码分析

    类AbstractQueuedSynchronizer是java并发包中的核心,是实现大部分并发工具类的底层工具类,现在对这个类的源码进行分析。

    冬天里的懒猫
  • 多线程基础(十八):ReentrantReadWriteLock源码分析

    ReentrantReadWriteLock是基于AQS实现的可重入的读写锁。这个锁在使用的时候将锁分为了两个部分,ReadLock和WriteLock。实际上...

    冬天里的懒猫
  • 多线程基础(一): 线程概念及生命周期

    什么是进程,相信大家都知道什么是进程却很难解释清楚。百科中的解释是:进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调...

    冬天里的懒猫
  • JAVA-LOCK之底层实现原理(源码分析)

    首先和Synchronized(可以参考) 的不同之处,Lock完全用Java写成,在java这个层面是无关JVM实现的。其实现都依赖java.util.con...

    海涛
  • 【深入AQS原理】我画了35张图就是为了让你深入 AQS

    谈到并发,我们不得不说AQS(AbstractQueuedSynchronizer),所谓的AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知...

    一枝花算不算浪漫
  • AbstractQueuedSynchronizer超详细原理解析

     今天我们来研究学习一下AbstractQueuedSynchronizer类的相关原理,java.util.concurrent包中很多类都依赖于这个类所提供...

    程序员历小冰
  • ReentrantLock知识点梳理

    接下来几篇文章会对JUC并发包里面的锁工具类做下梳理,如:ReentrantLock、

    用户7676729
  • 基于AQS原理实现的锁

    你好,我是疾风先生,先后从事外企和互联网大厂的java和python工作, 记录并分享个人技术栈,欢迎关注我的公众号,致力于做一个有深度,有广度,有故事的工程师...

    keithl
  • Java并发编程--Lock

      Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。synchronized方法或代码块的使用提供了对与每个对象相关的...

    在周末
  • 详细讲解并发编程中不得不学的AQS

    谈到并发编程,不得不说AQS(AbstractQueuedSynchronizer),这可谓是Doug Lea老爷子的大作之一。AQS即是抽象队列同步器,是用来...

    java技术爱好者

扫码关注云+社区

领取腾讯云代金券