ReentrantLock 源码浅析

ReentrantLock 介绍

一个可重入的互斥锁,它具有与使用{synchronized}方法和语句访问的隐式监视器锁相同的基本行为和语义,但它具有可扩展的能力。

一个ReentrantLock会被最后一次成功锁定(lock)的线程拥有,在还没解锁(unlock)之前。当锁没有被其他线程拥有的话,一个线程执行『lock』方法将会返回,获取锁成功。一个方法将会立即的返回,如果当前线程已经拥有了这个锁。可以使用『isHeldByCurrentThread』和『getHoldCount』来检查当前线程是否持有锁。

这个类的构造方法会接受一个可选的“fairness”参数。当该参数设置为true时,在发生多线程竞争时,锁更倾向将使用权授予给最长等待时间的线程。另外,锁不保证任何特定的访问顺序。程序在多线程情况下使用公平锁来访问的话可能表现出较低的吞吐量(如,较慢;经常慢很多)与比使用默认设置相比,但是在获取锁上有较小的时间差异,并保证不会有饥饿(线程)。然而需要注意的是,公平锁并不保证线程调度的公平性。(也就是说,即使使用公平锁,也无法确保线程调度器是公平的。如果线程调度器选择忽略一个线程,而该线程为了这个锁已经等待了很长时间,那么就没有机会公平地处理这个锁了) 还需要注意的是,没有时间参数的『tryLock()』方法是没有信誉的公平设置。它将会成功如果锁是可获取的,即便有其他线程正在等待获取锁。

除了对Lock接口的实现外,这个类还定义了一系列的public和protected方法用于检测lock的state。这些方法中的某些方法仅用于检测和监控。

这个类的序列化行为同lock内置的行为是一样的:一个反序列化的锁的状态(state)是未锁定的(unlocked),无论它序列化时的状态(state)是什么。

这个锁支持同一个线程最大递归获取锁2147483647(即,Integer.MAX_VALUE)次。如果尝试获取锁的次数操作了这个限制,那么一个Error获取lock方法中抛出。

AbstractQueuedSynchronizer

ReentrantLock的公平锁和非公平锁都是基于AbstractQueuedSynchronizer(AQS)实现的。ReentrantLock使用的是AQS的排他锁模式,由于AQS除了排他锁模式还有共享锁模式,本文仅对ReentrantLock涉及到的排他锁模式部分的内容进行介绍,关于共享锁模式的部分会在 CountDownLatch 源码浅析一文中介绍。

AQS提供一个框架用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和同步器(信号量,事件等)。这个类被设计与作为一个有用的基类,一个依赖单一原子值为代表状态的多种同步器的基类。子类必须将修改这个状态值的方法定义为受保护的方法,并且该方法会根据对象(即,AbstractQueuedSynchronizer子类)被获取和释放的方式来定义这个状态。根据这些,这个类的其他方法实现所有排队和阻塞的机制。子类能够维护其他的状态属性,但是只有使用『getState』方法、『setState』方法以及『compareAndSetState』方法来原子性的修改 int 状态值的操作才能遵循相关同步性。

等待队列节点类 ——— Node

等待队列是一个CLH锁队列的变体。CLH通常被用于自旋锁(CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。)。我们用它来代替阻塞同步器,但是使用相同的基本策略,该策略是持有一些关于一个线程在它前驱节点的控制信息。一个“status”字段在每个节点中用于保持追踪是否一个线程需要被阻塞。一个节点会得到通知当它的前驱节点被释放时。队列中的每一个节点都作为一个持有单一等待线程的特定通知风格的监视器。状态字段不会控制线程是否被授予锁等。一个线程可能尝试去获取锁如果它在队列的第一个。但是首先这并不保证成功,它只是给与了竞争的权力(也就是说,队列中第一个线程尝试获取锁时,并不保证一定能得到锁,它只是有竞争锁的权力而已)。所以当前被释放的竞争者线程可能需要重新等待获取锁。 (这里说的"队列中的第一个的线程"指的时,从队列头开始往下的节点中,第一个node.thread != null的线程。因为,AQS队列的head节点是一个虚节点,不是有个有效的等待节点,因此head节点的thread是为null的。)

为了排队进入一个CLH锁,你可以原子性的拼接节点到队列中作为一个新的队尾;对于出队,你只要设置头字段。(即,入队操作时新的节点会排在CLH锁队列的队尾,而出队操作就是将待出队的node设置为head。由此可见,在AQS中维护的这个等待队列,head是一个无效的节点。初始化时head是一个new Node()节点;在后期的操作中,需要出队的节点就会设置到head中。)

          +------+  prev +-----+       +-----+
     head |      | <---- |     | <---- |     |  tail
          +------+       +-----+       +-----+

插入到一个CLH队列的请求只是一个对“tail”的单个原子操作,所以有一个简单的从未入队到入队的原子分割点。类似的,出队调用只需要修改“head”。然而,节点需要更多的工作来确定他们的后继者是谁,部分是为了处理由于超时和中断而导致的可能的取消。 (也就是说,一个node的后继节点不一定就是node.next,因为队列中的节点可能因为超时或中断而取消了,而这些取消的节点此时还没被移除队列(也许正在移除队列的过程中),而一个node的后继节点指的是一个未被取消的有效节点,因此在下面的操作中你就会发现,在寻找后继节点时,寻找的都是当前节点后面第一个有效节点,即非取消节点。)

“prev”(前驱)连接(原始的CLH锁是不使用前驱连接的),主要用于处理取消。如果一个节点被取消了,它的后驱(通常)会重连接到一个未被取消的前驱。

另外我们使用“next”连接去实现阻塞机制。每个节点的线程ID被它们自己的节点所持有,所以前驱节点通知下一个节点可以被唤醒,这是通过遍历下一个链接(即,next字段)来确定需要唤醒的线程。后继节点的决定必须同‘新入队的节点在设置它的前驱节点的“next”属性操作(即,新入队节点为newNode,在newNode的前驱节点preNewNode进行preNewNode.next = newNode操作)’产生竞争。一个解决方法是必要的话当一个节点的后继看起来是空的时候,从原子更新“tail”向前检测。(或者换句话说,next链接是一个优化,所以我们通常不需要反向扫描。)

取消引入了对基本算法的一些保守性。当我们必须为其他节点的取消轮询时,我们不需要留意一个取消的节点是在我们节点的前面还是后面。它的处理方式是总是根据取消的节点唤醒其后继节点,允许它们去连接到一个新的前驱节点,除非我们能够标识一个未被取消的前驱节点来完成这个责任。

  • waitStatus
volatile int waitStatus;

状态属性,只有如下值: ① SIGNAL: static final int SIGNAL = -1; 这个节点的后继(或者即将被阻塞)被阻塞(通过park阻塞)了,所以当前节点需要唤醒它的后继当它被释放或者取消时。为了避免竞争,获取方法必须首先表示他们需要一个通知信号,然后再原子性的尝试获取锁,如果失败,则阻塞。 也就是说,在获取锁的操作中,需要确保当前node的preNode的waitStatus状态值为’SIGNAL’,才可以被阻塞,当获取锁失败时。(『shouldParkAfterFailedAcquire』方法的用意就是这) ② CANCELLED: static final int CANCELLED = 1; 这个节点由于超时或中断被取消了。节点不会离开(改变)这个状态。尤其,一个被取消的线程不再会被阻塞了。 ③ CONDITION: static final int CONDITION = -2; 这个节点当前在一个条件队列中。它将不会被用于当做一个同步队列的节点直到它被转移到同步队列中,转移的同时状态值(waitStatus)将会被设置为0。(这里使用这个值将不会做任何事情与该字段其他值对比,只是为了简化机制)。 ④ PROPAGATE: static final int PROPAGATE = -3; 一个releaseShared操作必须被广播给其他节点。(只有头节点的)该值会在doReleaseShared方法中被设置去确保持续的广播,即便其他操作的介入。 ⑤ 0:不是上面的值的情况。 这个值使用数值排列以简化使用。非负的值表示该节点不需要信号(通知)。因此,大部分代码不需要去检查这个特殊的值,只是为了标识。 对于常规的节点该字段会被初始化为0,竞争节点该值为CONDITION。这个值使用CAS修改(或者可能的话,无竞争的volatile写)。

  • prev
volatile Node prev

连接到前驱节点,当前节点/线程依赖与这个节点waitStatus的检测。分配发生在入队时,并在出队时清空(为了GC)。并且,一个前驱的取消,我们将短路当发现一个未被取消的节点时,未被取消的节点总是存在因为头节点不能被取消:只有在获取锁操作成功的情况下一个节点才会成为头节点。一个被取消的线程绝不会获取成功,一个线程只能被它自己取消,不能被其他线程取消。

  • next
volatile Node next

连接到后继的节点,该节点是当前的节点/线程释放唤醒的节点。分配发生在入队时,在绕过取消的前驱节点时进行调整,并在出队列时清空(为了GC的缘故)。一个入队操作(enq)不会被分配到前驱节点的next字段,直到tail成功指向当前节点之后(通过CAS来将tail指向当前节点。『enq』方法实现中,会先将node.prev = oldTailNode;在需要在CAS成功之后,即tail = node之后,再将oldTailNode.next = node;),所以当看到next字段为null时并不意味着当前节点是队列的尾部了。无论如何,如果一个next字段显示为null,我们能够从队列尾向前扫描进行复核。被取消的节点的next字段会被设置为它自己,而不是一个null,这使得isOnSyncQueue方法更简单。

  • thread
volatile Thread thread

这个节点的入队线程。在构建时初始化,在使用完后清除。

  • nextWaiter
Node nextWaiter

链接下一个等待条件的节点,或者一个指定的SHARED值。因为只有持有排他锁时能访问条件队列,所以我们只需要一个简单的单链表来维持正在等待条件的节点。它们接下来会被转换到队列中以去重新获取锁。因为只有排他锁才有conditions,所以我们使用给一个特殊值保存的字段来表示共享模式。 也就是说,nextWaiter用于在排他锁模式下表示正在等待条件的下一个节点,因为只有排他锁模式有conditions;所以在共享锁模式下,我们使用’SHARED’这个特殊值来表示该字段。

源码分析

初始化
  • 初始化 ———— 公平锁:
ReentrantLock lock = new ReentrantLock(true)
  • 初始化 ———— 非公平锁:
ReentrantLock lock = new ReentrantLock()

或

ReentrantLock lock = new ReentrantLock(false)
lock
public void lock() {
    sync.lock();
}

获取锁。 如果其他线程没有持有锁的话,获取锁并且立即返回,设置锁被持有的次数为1. 如果当前线程已经持有锁了,那么只有锁的次数加1,并且方法立即返回。 如果其他线程持有了锁,那么当前线程会由于线程调度变得不可用,并处于休眠状态直到当前线程获取到锁,此时当前线程持有锁的次数被设置为1次。

  • 公平锁『lock()』方法的实现: 『FairSync#lock()』
final void lock() {
    acquire(1);
}

调用『acquire』在再次尝试获取锁失败的情况下,会将当前线程入队至等待队列。该方法会在成功获取锁的情况下才会返回。因此该方法是可能导致阻塞的(线程挂起)。

  • 非公平锁『lock()』方法的实现: 『NonfairSync#lock()』
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

① 尝试获取锁,若『compareAndSetState(0, 1)』操作成功(这步操作有两层意思。第一,当前state为0,说明当前锁没有被任何线程持有;第二,原子性的将state从’0’修改为’1’成功,说明当前线程成功获取了这个锁),则说明当前线程成功获取锁。那么设置锁的持有者为当前线程(『setExclusiveOwnerThread(Thread.currentThread())』)。 那么此时,AQS state为1,锁的owner为当前线程。结束方法。 ② 如果获取锁失败,则调用『acquire』在再次尝试获取锁失败的情况下,会将当前线程入队至等待队列。该方法会在成功获取锁的情况下才会返回。因此该方法是可能导致阻塞的(线程挂起)。

  • 公共方法『AbstractQueuedSynchronizer#acquire』
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

在排他模式下尝试获,忽略中断。该方法的实现至少执行一次『tryAcquire』方法,如果成功获取锁则返回。否则,线程将入队等待队列中,可能会重复阻塞和解除阻塞,以及调用『tryAcquire』直到成功获取锁。这个方法能被用于实现『Lock#lock』 ① 执行『tryAcquire』来尝试获取锁,如果成功(即,返回true)。则返回true退出方法;否则到第②步 ② 执行『acquireQueued(addWaiter(Node.EXCLUSIVE), arg)』 『addWaiter(Node.EXCLUSIVE)』:为当前线程创建一个排他模式的Node,并将这个节点加入等待队列尾部。 『acquireQueued』:已经入队的节点排队等待获取锁。 ③ 如果在尝试获取锁的过程中发现线程被标志位了中断。因为是通过『Thread.interrupted()』方法来检测的当前线程是否有被标志位中断,该方法会清除中断标志,所以如果线程在尝试获取锁的过程中发现被标识为了中断,则需要重新调用『Thread.currentThread().interrupt();』重新将中断标志置位。 该方法是排他模式下获取锁的方法,并且该方法忽略中断,也就说中断不会导致该方法的结束。首先,会尝试通过不公平的方式立即抢占该锁(『tryAcquire』),如果获取锁成功,则结束方法。否则,将当前线程加入到等待获取锁的队列中,如果当前线程还未入队的话。此后就需要在队列中排队获取锁了,而这就不同于前面非公平的方式了,它会根据FIFO的公平方式来尝试获取这个锁。而这个方法会一直“阻塞”直到成功获取到锁了才会返回。注意,这里的“阻塞”并不是指线程一直被挂起这,它可能被唤醒,然后同其他线程(比如,那么尝试非公平获取该锁的线程)竞争这个锁,如果失败,它会继续被挂起,等待被唤醒,再重新尝试获取锁,直到成功。 同时注意,关于中断的操作。因为该方法是不可中断的方法,因此若在该方法的执行过程中线程被标志位了中断,我们需要确保这个标志位不会因为方法的调用而被清除,也就是我们不处理中断,但是外层的逻辑可能会对中断做相关的处理,我们不应该影响中断的状态,即,“私自”在不处理中断的情况下将中断标志清除了。 先继续来看公平锁和非公平锁对『tryAcquire』方法的实现 tryAcquire 这类型的方法都不会导致阻塞(即,线程挂起)。它会尝试获取锁,如果失败就返回false。

  • FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

公平版本的『tryAcquire』方法。当前线程没有权限获取锁,除非递归调用到没有等待者了,或者当前线程就是第一个尝试获取锁的线程(即,等待队列中没有等待获取锁的线程)。 ① 获取AQS state,即,当前锁被获取的次数。如果为’0’,则说明当前锁没有被任何线程获取,那么执行『hasQueuedPredecessors』方法判断当前线程之前是否有比它等待更久准备获取锁的线程: a)如果有则方法结束,返回false; b)如果没有,则说明当前线程前面没有另一个比它等待更久的时间在等待获取这个锁的线程,则尝试通过CAS的方式让当前的线程获取锁。如果成功则设置持有锁的线程为当前线程(『setExclusiveOwnerThread(current)』),然后方法结束返回true。 ② 如果AQS state > 0,则说明当前锁已经被某个线程所持有了,那么判断这个持有锁的线程是否就是当前线程(『current == getExclusiveOwnerThread()』),如果是的话,尝试进行再次获取这个锁(ReentrantLock是一个可重入的锁)如果获取锁的次数没有超过上限的话(即,c + acquires > 0),则更新state的值为最终该锁被当前线程获取的次数,然后方法结束返回true;否则,如果当前线程获取这个锁的次数超过了上限则或抛出Error异常。再者如果当前线程不是持有锁的线程,则方法结束返回false。 『AbstractQueuedSynchronizer#hasQueuedPredecessors』

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

查询是否有线程已经比当前线程等待更长的时间在等待获取锁。 这个方法的调用等价于(但更高效与): 『getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads()』 即,可见如下任一情况,则说明当前线程前面没有比它等待更久的需要获取锁的线程: a)当队列中第一个等待获取锁的线程是当前线程时 b)等待队列为空时。即,当前没有其他线程等待获取锁。 注意,因为中断和超时导致的取消可能发生在任何时候,该方法返回‘true’不能保证其他线程会比当前线程更早获得到锁。同样,由于队列是空的,在当前方法返回‘false’之后,另一个线程可能会赢得一个入队竞争。 这个方法被涉及用于一个公平的同步器,以避免闯入。如果这个方法返回’true’,那么像是一个(公平)同步器的『tryAcquire』方法应该返回’false’,以及『tryAcquireShared』方法需要返回一个负数值(除非这是一个可重入的锁,因为可重入锁,获取锁的结果还需要判断当前线程是否就是已经获取锁的线程了,如果是,则在没有超过同一线程可获取锁的次数上限的情况下,当前线程可以再次获取这个锁)。比如,一个公平的、可重入的、排他模式下的『tryAcquire』方法,可能看起来像是这样的:

返回: a)true:如果当前线程前面有排队等待的线程 b)false:如果当前线程是第一个等待获取锁的线程(即,一般就是head.next);或者等待队列为空。 该方法的正确性依赖于head在tail之前被初始化,以及head.next的精确性,如果当前线程是队列中第一个等待获取锁的线程的时候。 ① tail节点的获取一定先于head节点的获取。因为head节点的初始化在tail节点之前,那么基于当前的tail值,你一定能获取到有效的head值。这么做能保证接下来流程的正确性。举个反例来说明这么做的必要性:如果你按『Node h = head; Node t = tail;』的顺序来对h、t进行赋值,那么可能出现你在操作这两步的时候有其他的线程正在执行队列的初始化操作,那么就可能的带一个『h==null』,而『tail!=null』的情况(这种情况下,是不对的,因为tail!=null的话,head一定也不为null了),这使得『h != t』判断为true,认为当下是一个非空的等待队列,那么接着执行『s = h.next』就会抛出NPE异常了。但是当『Node t = tail; Node h = head;』按初始化相反的顺序来赋值的话,则不会有问题,因为我们保证了在tail取值的情况下,head的正确性。我们接下看下面的步骤,来说明为什么这么做就可以了。 ② 在获取完t、h之后,我们接着先判断『h != t』,该判断的用意在于,判断当前的队列是否为空。如果为true则说明,当前队列非空。如果为false 则说明当前队列为空,为空的话,方法就直接结束了,并返回false。 但是请注意,当『h != t』为true时,其实是有两种情况的: a)当tail和head都非空时,说明此时等待队列已经完成了初始化,head和tail都指向了其队列的头和队列的尾。 b)当“tail==null”同时“head != null”,则说明,此时队列正在被其他线程初始化,当前我们获取的h、t是初始化未完成的中间状态。但是没关系,下面的流程会对此请进行判断。 ③ 当『h != t』返回’true’的话,继续判断『(s = h.next) == null || s.thread != Thread.currentThread()』。这里的两个判断分别对应了两种情况: a)『(s = h.next) == null』返回’true’,则说明当获取的h、t为初始化的中间状态,因为第一个线程入队的时候,会先初始化队列,然后才对head的next值进行赋值,所以我们需要“s = h.next”是否为null进行判断,如果为’null’,则说明当前等待队列正在被初始化,并且有一个线程正在入队的操作中。所以此时方法直接结束,并且返回true。 b)如果『h != t』并且『(s = h.next) != null』,则说明当前线程已经被初始化好了,并且等待队列中的第一个等待获取锁的线程也已经入队了。那么接着我们就判断这个在等待队列中第一个等待获取锁的线程是不是当前线程『s.thread != Thread.currentThread()』,如果是的话,方法结束并返回false,表示当前线程前面没有比它等待更久获取这个锁的线程了;否则方法结束返回true,表示当前线程前面有比它等待更久希望获取这个锁的线程。 『AbstractQueuedSynchronizer#addWaiter』

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

根据给定的模式创建当前线程的节点,并将创建好的节点入队(加入等待队列尾部)。 首先在队列非空的情况下会尝试一次快速入队,也就是通过尝试一次CAS操作入队,如果CAS操作失败,则调用enq方法进行“自旋+CAS”方法将创建好的节点加入队列尾。 在排他模式下,将节点加入到锁的同步队列时,Node的mode(即,waitStatus)为’EXCLUSIVE’。waitStatus是用于在排他锁模式下当节点处于条件队列时表示下一个等待条件的节点,所以在加入到锁的同步队列中(而非条件队列),我们使用’EXCLUSIVE’这个特殊值来表示该字段。本文主要围绕共享锁模式的介绍,就不对其进行展开了,关于排他锁的内容会在“ReentrantLock源码解析”一文中介绍。

  • NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

对父类AQS tryAcquire方法的重写。调用『nonfairTryAcquire(acquires)』方法,非公平的尝试获取这个可重入的排他锁 『nonfairTryAcquire』

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

执行不公平的『tryLock』。『tryAcquire』在子类中实现,但是都需要不公平的尝试在『tryLock』方法中。 ① 获取state值,如果为’0’,则说明当前没有线程占用锁,那么调用CAS来尝试将state的值从0修改为’acquires’的值, a)如果成功则说明当前线程成功获取到了这个不公平锁,那么通过『setExclusiveOwnerThread(current)』方法来标志当前线程为持有锁的线程,方法结束,返回true; b)如果失败,则说明有其他线程先获取到了这个锁,那么当前线程获取锁失败。方法结束,返回false。 ② "state != 0",则说明当前锁已经被某个线程所持有了,那么判断当前的线程是否就是持有锁的那个线程(『if (current == getExclusiveOwnerThread())』)。 a)如果持有锁的线程就是当前线程,因为ReentrantLock是一个可重入的锁,所以接下来继续判断预计递归获取锁的次数是否超过了限制的值(即,“nextc < 0”则说明预计递归获取锁的次数超过了限制值Integer.MAX_VALUE了),那么会抛出“Error”异常;否则将当前state的值设置为最新获取锁的次数(注意,这里不需要使用CAS的方式来修改state了,因为能操作到这里的一定就是当前持有锁的线程了,因此是不会发送多线程竞争的情况)。然后方法结束,返回true; b)如果持有锁的线程不是当前线程,那么当前线程获取锁失败。方法结束,返回false。

  • 『FairSync#lock』 VS 『NonfairSync#lock』 a)在公平锁的模式下,所有获取锁的线程必须是按照调用lock方法先后顺序来决定的,严格的说当有多个线程同时尝试获取同一个锁时,多个线程最终获取锁的先后顺序是由入队等待队列的顺序决定的,当然,第一个获取锁的线程是无需入队的,等待队列是用于存储那些尝试获取锁失败后的节点。并且按照FIFO的顺序让队列中的节点依次获取锁。 b)在非公平模式下,当执行lock时,无论当前等待队列中是否有等待获取锁的线程了,当前线程都会尝试直接去获取锁。 ?两点从『FairSync#lock』与『NonfairSync#lock』 实现的不同,以及『FairSync#tryAcquire』与『NonfairSync#tryAcquire』方法实现的不同中都能表现出来。

对于非公平锁:首先会尝试立即抢占获取锁(若锁当前没有被任何线程持有时,并且此时它会和当前同时首次尝试获取该锁的线程以及等待队列中尝试获取该锁的线程竞争),如果获取锁失败,则会被入队到等待队列中,此时就需要排队等待获取锁了。

在排他锁模式下,等待队列中的第一个等待获取锁的线程(即,前驱节点是head节点的节点),仅代表这个节点当前有竞争获取锁的权力,并不代表它会成功获取锁。因为它可能会同非公平获取锁的操作产生竞争。 继续回到『AbstractQueuedSynchronizer#acquire』方法,继续展开里面的实现。我们接着看『acquireQueued』方法是如何实现将已经入队的节点排队等待获取锁的。 『AbstractQueuedSynchronizer#acquireQueued』

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

用于已经入队的线程在排他不可中断模式下获取锁。同时也被用于给条件(condition)的wait方法来获取锁。 该方法不会因为中断的发送而返回,只有在获取锁的情况下才会返回,但是如果在等待获取锁的过程中,当前线程被标识为了中断,则在方法返回的时候返回true,否则方法返回是返回false。 ① 获取当前节点的前驱节点,如果前驱节点是head节点,则说明当前节点是队列中第一个等待获取锁的节点,那么就执行『tryAcquire』方法尝试获取排他锁,如果获取排他锁成功(即,tryAcquire方法返回true)则调用『setHead(node)』将当前节点设置为头节点。然后将p(即,旧的head节点)的next置null,有助于p被垃圾收集器收集。然后标识failed为false。结束方法调用,返回interrupted(该字段表示在等待获取锁的过程中,当前线程是否有被表示为中断了)。 ② 如果当前节点的前驱节点不是head节点,说明该节点前面已经有等待着获取这个排他锁的节点;或者当前节点的前驱节点是head节点,但是当前节点获取锁失败了,那么执行『shouldParkAfterFailedAcquire』方法,若该方法返回true,则说明本次获取排他锁失败需要阻塞/挂起当前线程,那么就调用『LockSupport.park(this);』将当前线程挂起,直到被唤醒,并且若挂起期间该线程被标志为了中断状态,则将interrupted标识为true。 ③ 当当前节点经过多次唤醒与挂起,终于成功获取锁后,则退出方法,并返回当前线程是否有被中断的标志。如果当前节点因为某些原因没有成功获取到锁,却要结束该方法了,那么调用『cancelAcquire(node)』方法将当前节点从等待队列中移除。因为方法结束了,说明当前节点不会被操作再去尝试获取锁了,那么就不应该作为一个有效节点放在等待队列中,应该被标识为无效的节点后从队列中移除。 『AbstractQueuedSynchronizer#setHead』

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

将node设置为队列的头节点,当它出队时。该方法只能被获取方法调用。仅将无用字段清空(即,置为null)以便于GC并废除不必要的通知和递归。 『AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire』

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

检查并修改一个节点的状态,当该节点获取锁失败时。返回true如果线程需要阻塞。这是主要的信号通知控制在所有的获取锁循环中。要求’pred’ == ‘node.prev’ ① 如果pred.waitStatus == Node.SIGNAL。则说明node的前驱节点已经被要求去通知释放它的后继节点,所以node可以安全的被挂起(park)。然后,退出方法,返回true。 ② 如果pred.waitStatus > 0。则说明node的前驱节点被取消了。那么跳过这个前驱节点并重新标志一个有效的前驱节点(即,waitStatus <= 0 的节点可作为有效的前驱节点),然后,退出方法,返回false。 ③ 其他情况下,即pred.waitStatus为’0’或’PROPAGATE’。表示我们需要一个通知信号(即,当前的node需要唤醒的通知),但是当前还不能挂起node。调用『compareAndSetWaitStatus(pred, ws, Node.SIGNAL)』方法通过CAS的方式来修改前驱节点的waitStatus为“SIGNAL”。退出方法,返回false。 我们需要一个通知信号,主要是因为当前线程要被挂起了(park)。而如果waitStatus已经是’SIGNAL’的话就无需修改,直接挂起就好,而如果waitStatus是’CANCELLED’的话,说明prev已经被取消了,是个无效节点了,那么无需修改这个无效节点的waitStatus,而是需要先找到一个有效的prev。因此,剩下的情况就只有当waitStatus为’0’和’PROPAGAET’了(注意,waitStatus为’CONDITION’是节点不在等待队列中,所以当下情况waitStatus不可能为’CONDITION’),这是我们需要将prev的waitStatus使用CAS的方式修改为’SIGNAL’,而且只有修改成功的情况下,当前的线程才能安全被挂起。 还值得注意的时,因此该方法的CAS操作都是没有自旋的,所以当它操作完CAS后都会返回false,在外层的方法中会使用自旋,当发现返回的是false时,会再次调用该方法,以检查保证有当前node有一个有效的prev,并且其waitStatus为’SIGNAL’,在此情况下当前的线程才会被挂起(park)。

unlock
public void unlock() {
    sync.release(1);
}

尝试去释放这个锁。 如果当前线程是持有这个锁的线程,那么将持有次数减少1。如果释放后当前的锁被持有的次数为0,那么锁被释放。如果当前线程不是持有锁的线程,那么抛出“IllegalMonitorStateException”异常。 『AbstractQueuedSynchronizer#release』

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

排他模式下的释放。该方法的实现通过解除一个或多个线程的的阻塞,如果『tryRelase』方法返回true的话。 该方法能够被用于实现『Lock#unlock』。 ① 调用『tryRelease(arg)』方法来尝试设置状态值来反映一个排他模式下的释放。如果操作成功,则进入步骤[2];否则,如果操作失败,则方法结束,返回false。 ② 在成功释放给定的状态值后,获取等待队列的头节点。如果头节点不为null并且头节点的waitStatus!=0(头节点的waitStatus要么是’0’,要么是’SIGNAL’),那么执行『unparkSuccessor(h)』来唤醒头节点的后继节点。(节点被唤醒后,会继续acquireQueued方法中流程) ③ 只要『tryRelease(arg)』释放操作成功,无论是否需要唤醒头结点的后继节点,方法结束都会返回true。 『tryRelease』

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

尝试通过设置状态值来反映一个在排他模式下的释放操作。 ①『Thread.currentThread() != getExclusiveOwnerThread()』如果执行释放操作的线程不是持有锁的线程,那么直接抛出“IllegalMonitorStateException”异常,方法结束;否则 ② 如果执行当前释放操作的线程是持有锁的线程,那么 a)计算新state的值,即当前释放操作后state的值,如果state为0,则说明当前线程完成释放了对该锁的持有,那么将锁的持有者重置为null(即,『setExclusiveOwnerThread(null)』)。然后通过『setState(c);』将AQS的state值设置为这个新的state值(即,0),结束方法,返回true,表示该锁现在没有线程持有,可以被重新获取。 b)如果新state的值不为0(即,大于0),则说明当前的线程并未完成释放该锁(因为reentrantLock是一个可重入的锁,所以一个线程可以多次获取这锁,而state值就表示这一线程获取锁的次数),那么通过『setState(c);』将AQS的state值设置为这个新的state值,结束方法,返回false。 可见对于『tryRelease』方法,释放锁操作失败是通过抛出“IllegalMonitorStateException”异常来表示的。该方法无论返回‘true’还是‘false’都表示本次的释放操作完成了。返回‘true’表示的是锁已经被当前线程完全释放了,其他线程可以继续争夺这个锁了,在完全释放锁的时候也会将锁中持有者字段重新置null;返回‘false’表示的是当前释放操作完成后,该线程还继续持有这该锁,此时其他线程是无法获取到这个锁的。 同时,我们可以知道,释放操作只能有持有锁的线程来完成,因此对于AQS state字段(一个volatile字段)的修改,不需要使用CAS来完成,只需要直接设置修改就好。 『AbstractQueuedSynchronizer#unparkSuccessor』

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

唤醒后继节点,如果存在的话 ① 如果状态值是负数,则在预期发信号通知时清除这个负数状态值。如果状态被等待的线程修改了或者清除负数状态值失败是允许。 ② 后继节点的线程被唤醒,后继节点通常就是下一个节点。但是如果下一个节点被取消了或者下一个节点为null,则从队列尾(tail)往前遍历去找真实的未取消的后继节点。 『(s == null || s.waitStatus > 0)』:说明下一个节点为null或被取消了(waitStatus允许的状态值中,只有’CANCELLED’是>0的)。那么,就从队列尾(tail)开始向前遍历,获取第一个非空且未被取消的节点。如果存在这样的一个后继节点的话(即,“s != null”),则执行『LockSupport.unpark(s.thread);』操作来唤醒这个节点的线程,此时等待队列中第一个等待的线程就会被重新启动,流程会回到『acquireQueued』方法,该线程会重新重试获取该锁,如果成功acquireQueued方法返回,否则线程会再次被挂起,等待下次唤醒后再去再去竞争获取锁。

Q:关于node的waitStatus为’CANCELLED’的情况? A:关于node的waitStatus为’CANCELLED’的情况:比如,当这个node被中断了,或者设置的超时时间到了,那么说明这个线程获取锁失败,那么此时就应该将其设置为cancelled,因为如果该线程还需要获取锁的话,会重新调用获取锁的方法,而获取锁的方法就是创建一个新的node的。所以,那么线程获取锁失败的时候就会将这个node的waitStatus设置为’CANCELLED’,一个被取消的线程绝不会获取锁成功,一个线程只能被它自己取消,不能被其他线程取消。

Q:关于node为null的情况? A:关于node为null的情况:比如,一个入队操作(enq)不会被分配到前驱节点的next字段,直到tail成功指向当前节点之后(通过CAS来将tail指向当前节点。『enq』方法实现中,会先将node.prev = oldTailNode;在需要在CAS成功之后,即tail = node之后,再将oldTailNode.next = node;),所以当看到next字段为null时并不意味着当前节点是队列的尾部了。无论如何,如果一个next字段显示为null,我们能够从队列尾向前扫描进行复核。

Q:对于ReentrantLock无论是公平锁还是非公平锁,在入队时waitStatus都是什么?? 能确定的是从条件等待队列转移到锁的同步队列的时候,节点的waitStatus是’0’。 A:无论是公平锁还是非公平锁,在构建一个node的时候,waitStatus都是默认值’0’。然后在将node入队到锁的等待队列中后就会执行『acquireQueued』来等待获取锁,而该方法会修改当前节点的前驱节点的waitStatus(即,『shouldParkAfterFailedAcquire(p, node)』方法)。在当前节点无法获取锁的时候需要被挂起前会将其前驱节点的waitStatus设置为’Node.SIGNAL’。这样在释放操作中(『release』),如果释放后发现锁的state为’0’,则说明锁当前可以被其他线程获取了,那么就会获取锁的等待队列的head节点,如果head节点的waitStatus!=0(即,head的waitStatus为’Node.SIGNAL’或’Node.PROPAGATE’,其中’Node.PROPAGATE’是共享模式下head节点的waitStatus可能的值,在排他模式下,head节点的waitStatus是’Node.SIGNAL’或’0’),那么说明head节点后面有等待唤醒获取锁的线程,那么调用『unparkSuccessor』方法来唤醒head节点的后继节点。

在排他锁模式下,head节点的waitStatus不是在该节点被设置为head节点的时候修改的。而是如果有节点入队到等待队列中,并且此时该节点无法获取锁,那么会将其前驱节点的waitStatus设置为’Node.SIGNAL’后,该节点对应的线程就被挂起了。所以也就是说,如果head节点后还有节点等待获取锁,那么此时head节点的waitStatus自然会使’Node.SIGNAL’,这是在head节点的后继节点入队后等待获取锁的过程中设置的。而将一个节点设置为head节点,仅是将该节点赋值给head节点,并将thread和prev属性会被置null。

ConditionObject ———— 条件对象

条件对象用来管理那些已经获得了一个锁,但是却不能做有用工作的线程。

Condition实现是AbstractQueuedSynchronizer作为一个Lock实现的基础。 该类的方法文档描述了机制,而不是从锁和条件(Condition)用户的观点来指定行为规范。该类所暴露的版本通常需要伴随着依赖于描述相关AbstractQueuedSynchronizer的条件语义的文档。 这个类是可序列化的,但是所有字段都是transient的,所以反序列化的conditions没有等待者。

注意,关于在ConditionObject中的描述,若无特殊说明“等待队列”均指“条件等待队列”,同锁的等待队列不同!

条件等待队列中有效节点的waitStatus只能是“Node.CONDITION”,这说明,如果发现条件等待队列中的节点waitStatus!=“Node.CONDITION”,则说明这个节点被取消等待条件了,那么应该将其出条件等待队列中移除。

// 等待队列的头节点
private transient Node firstWaiter;

// 等待队列的尾节点
private transient Node lastWaiter;
  • 条件对象初始化
Condition condition = lock.newCondition()

『newCondition』

public Condition newCondition() {
    return sync.newCondition();
}

返回一个用于这个Lock实例的Condition实例。 返回的Condition实例与内置的监视器锁一起使用时,支持同Object监控器方法(『Object#wait()』、『Object#notify』、『Object#notifyAll』)相同的用法。 如果这个锁没有被持有的话任何Condition等待(『Condition#await()』)或者通知(『Condition#signal』)方法被调用,那么一个“IllegalMonitorStateException”异常将会抛出。(也就说是说,Condition是用在锁已经被持有的情况下)

当一个condition的等待方法被调用(『Condition#await()』),锁会被释放,并且在这个方法返回之前,锁会被重新获取并且锁的持有次数会重新存储为这个方法被调用到时候的值。 如果一个线程在等待期间被中断了,那么等待将会结束,一个“InterruptedException”异常将会抛出,并且线程的中断状态将被清除。 等待线程被以先进先出(FIFO)的顺序被通知。 等待获得锁的线程和调用await方法的线程存在本质上的不同。一旦一个线程调用await方法,它进入该条件的等待集。当锁可用时,该线程不能马上解除阻塞。相反,它处于阻塞状态,直到另一个线程调用同一条件上的signalAll方法时为止。这一调用重新激活因为这一条件而等待的所有线程。当这些线程从等待集当中移出时,它们再次成为可运行的,调度器将再次激活它们。同时,它们将试图重新进入该对象。一旦锁成为可用的,它们中的某个将从await调用返回,获得该锁并从被阻塞的地方继续执行。

  • await 『AbstractQueuedSynchronizer#await』
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

实现可中断的条件等待。 1.如果当前线程被中断了,则抛出“InterruptedException”异常。 2.保存通过『getState』返回的锁的状态(state) 3.调用『release』方法,使用保存的state作为参数,如果该方法失败则抛出“IllegalMonitorStateException”异常 4.线程阻塞直到收到唤醒通知或者线程被中断 5.通过调用使用保存状态为参数的指定版本的『acquire』方法来重新获取锁。 6.如果在步骤4中线程在堵塞的时候被中断了,那么抛出“InterruptedException”异常。

① 首先一进入该方法会先判断当前线程是否被标志为了中断状态,如果是则抛出“InterruptedException”异常,并且中断状态被清除。 ② 调用『addConditionWaiter』添加一个新的等待节点到条件等待队列中。 ③ 调用『fullyRelease(node)』使用锁当前的状态值执行释放,并返回这个状态值。(即,该方法调用完后,并执行成功的话,那么此时其他线程可以去获取这个锁了,可见await方法会使当前线程放弃对锁的持有。同时返回锁在释放前的状态值。) ④ 自旋判断创建的等待节点是否在所的同步队列中了,如果没有(则说明节点还未被信号通知,以从条件等待队列中转移到锁的同步队列中),则执行『LockSupport.park(this);』挂起当前线程,线程会一直被挂起直到被信号通知唤醒(『signal()』或『signalAll()』方法会将节点从条件等待队列转移到锁的同步队列中,并根据加入到同步队列后得到的前驱节点的waitStatus,可能会去唤醒当前线程;或者当锁的同步等待队列中的线程依次获取锁并释放后,直到轮到当前线程成为同步队列中第一个等待获取锁的线程时,当前线程或被唤醒)。接着判断线程是否发生中断,如果发送中断,则退出自旋;否则继续自旋重新执行本步骤的流程,直至新创建的等待节点被转移到锁的同步队列中。 ⑤ 执行『acquireQueued(node, savedState)』方法在排他模式下从等待队列中的线程获取锁,并且在获取锁后将锁的状态值设置为给定’savedState’(由于’savedState’就是上面因条件等待而释放锁前该线程获取锁的次数,而在该线程重新获取锁,继续await之后的流程时,保持了该线程在await之前持有锁的状态)。并且该方法会在获取锁的情况下才会返回: a)若在等待获取锁的过程中,当前线程被标识为了中断,则在方法返回的时候返回true;接着判断interruptMode是否等于“THROW_IE”,如果为true,则说明节点的等待在得到唤醒通知之前就被取消了,此时interruptMode为“THROW_IE”;否则interruptMode!=THROW_IE,则说明节点的等待在得到唤醒通知之后才被取消了,那么设置interruptMode为“REINTERRUPT”,继续步骤[6] b)若在等待获取锁的过程中,当前线程未被标识为中断,则继续步骤[6] (这里一个正常的未被中断的流程就是,await的节点对应的线程会在步骤[4]被挂起,然后在某一个时刻因为signalAll()方法调用,该节点被转移到了锁的等待队列中。然后当该线程为锁的等待队列中第一个等待获取锁的线程时,会被它的前驱节点唤醒,此时节点被唤醒,判断得到已经在等待队列中了,那么结束步骤[4]的自旋,进入的步骤[5],调用『acquireQueued(node, savedState)』尝试获取锁,此时节点已经具有获取锁的权限了,如果成功获取锁流程继续,否则节点会被再次挂起,acquireQueued方法会阻塞直到当前线程获取锁的情况下返回。) ⑥ 如果node节点的nextWaiter非null,那么执行『unlinkCancelledWaiters();』来清除等待队列中被取消的节点。 因为,如果node节点是通过signal/signalAll信号通知而从条件等待队列转移到锁的同步队列的话,那么node的nextWaiter是为null(在signal/signalAll方法中会将该字段置为null);否则如果是因为中断而将节点从条件等待队列转移到锁的同步队列的话,此时nextWaiter是不会被重置的,它依旧指向该节点在条件等待队列中的下一个节点。 ⑦ 如果中断模式标志不为’0’(即,“interruptMode != 0”),则根据给定的中断模式(interruptMode)在等待结束后报告中断(『reportInterruptAfterWait(interruptMode)』)

因此,从『await()』方法中,我们可以得知下面几点:

  1. 创建一个条件等待节点,并加入条件等待队列尾部。
  2. 彻底释放当前线程所持有锁(因为,首先只有在持有锁的情况下才可以执行await操作,再者ReentrantLock是一个可重入的锁,因此同一个线程可以多次获取锁),这样锁就可以被其他线程获取。但会记录这个线程在彻底释放锁之前持有该锁的次数(即,锁的state值)
  3. 在该线程再次获取该锁时,会将锁的state设置为释放之前的值。即,从await()条件等待返回的时候,当前线程对锁持有的状态同await()等待条件之前是一致的。
  4. 节点从条件等待队列中转移到锁的同步队列是两种情况: a)收到了信号通知,即signal/signalAll b)在未收到信号通知之前,检测到了当前线程被中断的标志。
  5. 在当前线程重新获取到锁,准备从await方法返回的时候,await方法的返回也分两种情况: a)在条件等待中的节点是通过signal/signalAll信号通知转移到锁的同步队列的,然后再在同步队列中根据FIFO的顺序来重新获取到了该锁。那么此时await方法正常返回。(在信号通知之后线程可能被标志位中断,但这不影响方法的正常返回) b)在条件等待中节点是因为当前线程被标志为了中断而将其转移到了锁的同步队列中,这样在当前线程再次重新获取锁时,方法会异常返回,即抛出“InterruptedException”异常。 接下来对,『await』中的源码细节进一步展开 『AbstractQueuedSynchronizer#addConditionWaiter』
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

添加一个新的等待者到等待队列中。 ①『Node t = lastWaiter;』:获取等待队列的中的最后一个节点, ②『t != null && t.waitStatus != Node.CONDITION』如果这个节点被取消了,那么调用『unlinkCancelledWaiters();』方法将等待队列中被取消的节点移除。并重新获取等待队列中的最后一个节点(『t = lastWaiter』) ③ 为当前线程创建一个waitStatus为“Node.CONDITION”的节点。 ④ 将新创建好的节点加入到等待队列的尾部: a)如当前等待队列为空(即,上面获取的t为null,也就是说,当等待队列尾指针为null时,则说明此时等待队列为空)那么需要先初始化firstWaiter,将其指向这个新创建的节点。然后将lastWaiter也指向这个新创建的节点。此时等待队列中只有一个节点,firstWaiter和lastWaiter都指向这个节点。 b)将等待队列中最后一个节点的next属性指向当前这个新创建的节点,然后将lastWaiter指向当前这个新创建的节点。 ⑤ 返回新创建的等待节点。 『AbstractQueuedSynchronizer#fullyRelease』

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

使用当前的状态值执行释放;返回保存的状态值。 若操作失败,则取消节点,并抛出异常。 ①『int savedState = getState();』获取当前锁的状态值 ② 使用获取的状态值执行释放操作『release(savedState)』,如果操作成功,则方法结束,返回释放使用的保存的状态值;如果操作失败,则抛出“IllegalMonitorStateException”异常,并取消node节点,即,将节点node的waitStatus设置为“Node.CANCELLED”。 『AbstractQueuedSynchronizer#isOnSyncQueue』

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    return findNodeFromTail(node);
}

返回true,如果一个节点总是初始化于条件队列中,并且当前在同步队列中等待获取锁。 ① 如果node的waitStatus为“Node.CONDITION”或者node的prev为null,则说明node节点当前还没有入队同步队列,方法结束,返回false;否则步骤[2] ② 接着判断『if (node.next != null)』,如果为true,则说明node已经入队完毕,则方法结束,返回true。否则步骤[3] ③ 调用『findNodeFromTail(node)』从同步队列尾开始寻找节点。此时,node.prev非null,但是由于通过CAS将节点入队的操作可能失败导致当前节点还未在同步队列中(即,节点入队操作还未完成)。所以我们需要从同步队列尾部开始向前遍历以明确该节点是否在同步队列中。在这种方法的调用中,节点总是靠近尾部,除非CAS失败(不太可能),否则节点将在同步队列尾部附近,所以我们几乎不会经历很多遍历。 『AbstractQueuedSynchronizer#findNodeFromTail』

private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

从同步队列尾向前查询节点,如果节点在同步队列中,则返回true。 仅在『isOnSyncQueue』方法内调用该方法。 从锁的等待队列尾部开始向前遍历,如果找到node节点则返回true;否则遍历完整个等待队列也就没法找到node节点,则返回false。 『AbstractQueuedSynchronizer#checkInterruptWhileWaiting』

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

用于检测中断,如果中断在信号通知之前发生则返回“THROW_IE”,若中断在信号通知之后发生则返回“REINTERRUPT”,或者如果没有被中断则返回“0”。 『AbstractQueuedSynchronizer#transferAfterCancelledWait』

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

如果需要的话,在取消等待后,将节点转移到同步队列。如果线程在被信号通知之前取消等待了则返回true。 ① 通过CAS的方式将节点的状态从“Node.CONDITION”修改为“0”,如果成功,则说明节点此时还没有收到信号通知,此时将节点的waitStatus从“Node.CONDITION”修改为“0”就是在被信号通知前取消了节点对条件的等待,接着调用『enq(node)』将节点入队到锁的等待队列中,并结束方法,返回true。 ② CAS操作失败,则说明该等待条件的节点被其他线程信号通知了(一般是signalAll),那么自旋调用『isOnSyncQueue(node)』以确保节点入队(锁的等待队列)完成后退出自旋(因为取消等待条件期间一个未完成的转换是罕见且瞬间的时期,所以使用自旋即可)。然后方法结束,返回false。 也就是说,首先该方法会确保node从条件等待队列转移到锁的同步队列中。node是因为该方法的执行而从条件等待队列转移到锁的同步队列的话,则返回true;否则如果node是因为signal/signalAll信号通知而从条件等待队列转移到锁的同步队列的话,则返回false。 『AbstractQueuedSynchronizer#enq』

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

使用自旋锁的方式(自旋+CAS)插入节点到等待队列,如果等待队列为空则初始化队列。 初始化队列:创建一个空节点(即,new Node()),将head和tail都指向这个节点。 然后才是将我们待插入的节点插入,即:emptyNode -> newNode. head指向emptyNode,tail指向newNode。 『AbstractQueuedSynchronizer#unlinkCancelledWaiters』

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

从等待队列中解除被取消等待节点的连接。该方法仅在持有锁的时候调用。这个方法调用发生在取消发生在等待条件期间,并根据一个新的等待节点插入时lastWaiter看起来已经被取消了。这个方法需要去避免垃圾的滞留在没有信号通知的时候。所以即便它可能需要一个完全遍历,这仅会在超时和取消发生在缺少通知的情况下发生。它会遍历所有的节点而非停止在一个指定的目标,以便在取消风暴期间不需要多次重新遍历就可以将所有的垃圾节点解除链接。 该方法会从firstWaiter开始遍历整个等待队列,将被取消(即,waitStatus != Node.CONDITION)的节点从等待队列中移除。 ①『Node t = firstWaiter;』:获取等待队列的头结点。 ② 从头节点开始遍历等待队列。 ③『Node next = t.nextWaiter;』获取当前遍历节点的下一个节点。 ④ 如果当前节点被取消了(即,『t.waitStatus != Node.CONDITION』),那么将当前节点的next字段置null(便于垃圾回收)。然后判断『trail == null』,如果为true,则说明目前是头节点被取消了,那么设置『firstWaiter=next』,即当前节点的下一个节点。此时,next节点可能是一个有效节点,也可能是一个被取消的节点(如果是被取消的节点,会在下一次循环的时候再次重新设置firstWaiter),也可能是一个null(如果为null,接下来就会退出循环,说明等待队列为空了);如果『trail == null』为false,则说明此遍历到的被取消的节点不是头节点,并且trail指向了遍历到目前为止等待队列中最后一个有效的等待节点,那么执行『trail.nextWaiter = next;』以将当前正在被遍历的节点从等待队列中解除连接。接着判断『next == null』,若为true,则说明当前遍历的被取消的节点是等待队列的最后一个节点,那么执行『lastWaiter = trail;』将lastWaiter指向最后一个有效的等待节点。 ⑤ 如果当前节点没有被取消(即,『t.waitStatus == Node.CONDITION』),那么将trail置为t,这说明了trail指向了在遍历等待队列过程中的最后一个有效的等待节点。 ⑥ 将t置为next,即当前遍历节点的下一个节点。继续步骤[3],直至整个等待队列节点都遍历完(即,next为null)。

signal/signalAll
  • ** signal**
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

将当前的条件等待队列中将等待时间最长的线程移动到锁的等待队列中,如果存在这么一个线程的话。 ① 判断执行该方法的当前线程是否是持有排他锁的线程,如果不是则抛出“IllegalMonitorStateException”异常。 ② 当执行该方法的线程是持有排他锁的线程时,获取条件等待队列中的第一个等待节点,若这个节点不为null,则执行『doSignal(first)』来信号通知这个节点。 注意,因为条件等待节点是按照FIFO的顺序操作节点的,也就是新的等待节点总是会添加对队列尾部,所以队列头节点就是等待最长时间的节点。 『AbstractQueuedSynchronizer#doSignal』

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

删除并转移节点,直到命中一个未被取消的节点或者节点为null(节点为null,说明等待队列中已经没有一个有效的节点了,即等待队列要么为空,要么等待队列中的节点都是被取消的节点)。 ① 根据给定的first节点,为起始遍历直至获取第一个有效等待节点,并信号通知该节点。 ② 将当前节点的下一个等待节点(nextWaiter)设置为firstWaiter,然后判断firstWaiter是否为null,如果为null则说明当前节点已经是条件等待队列中的最后一个节点了,那么将lastWaiter也置为null。 ③ 将当前遍历节点的nextWaiter置为null(以便于当前节点在方法结束后被垃圾收集器回收) ④ 执行『transferForSignal』将节点从条件等待队列转移到同步队列队列中,如果操作成功,则当前循环结束,方法返回;如果操作失败,那么继续从头节点开始循环步骤[2],直到成功转移一个节点或者条件等待队列为空为止。

  • signalAll
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

将条件等待队列中的所有线程转移到锁的等待队列中。 ① 判断执行该方法的当前线程是否是持有排他锁的线程,如果不是则抛出“IllegalMonitorStateException”异常。 ② 当执行该方法的线程是持有排他锁的线程时,获取条件等待队列中的第一个等待节点,若这个节点不为null,则执行『doSignalAll(first)』来信号通知所有的节点。 『AbstractQueuedSynchronizer#doSignalAll』

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

删除并转移所有的节点。 ① 将lastWaiter和firstWaiter置为null ② 从给定的节点为起始,开始遍历节点,调用『transferForSignal(first);』来将节点从条件等待队列中转移到锁的等待队列中。 『isHeldExclusively』

protected final boolean isHeldExclusively() {
    // While we must in general read state before owner,
    // we don't need to do so to check if current thread is owner
    return getExclusiveOwnerThread() == Thread.currentThread();
}

判断执行该方法的当前线程是否是持有排他锁的线程。 如下情况,返回’true’: a)执行该方法的线程就是持有排他锁的线程。 如下情况,返回’false’: a)执行该方法的线程不是持有排他锁的线程。 b)当前排他锁没有被任何线程所持有。 『AbstractQueuedSynchronizer#transferForSignal』

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

从条件等待队列转移一个节点到同步队列中。如果成功返回true 返回: a)true:成功从条件队列中转移一个节点到同步队列中 b)false:在释放信号通知之前,该节点被取消了。 ① 通过CAS的方式将需要转移的节点的状态从“Node.CONDITION”修改为“0”。如果CAS操作失败,则说明这个节点的已经被取消了。那么方法结束,返回false。 ② 将修改完状态后的节点加入到锁等待队列中(『enq(node)』),并得到加入到等待队列后,当前节点的前驱节点。 ③ 若前驱节点的"waitStatus > 0”(即,waitStatus为“CANCELLED”)或者通过CAS的方式将前驱节点的waitStatus修改为“SIGNAL”失败,则调用『LockSupport.unpark(node.thread);』将当前线程唤醒(唤醒后的线程会继续await中被挂起之后的流程)。 ④ 否则,"waitStatus <= 0”并且通过CAS成功将前驱节点的waitStatus修改为了“SIGNAL”,以此来标识当前线程正在等待获取锁。

  • 『signal』vs『signalAll』 signal解除的是条件等待队列中第一个有效的节点(即,节点的waitStatus为“CONDITION”),这比解除所有线程的阻塞更加有效,但也存在危险。如果signal的线程发现自己仍然不能运行,那么它再次被阻塞(await)。如果没有其他线程再次调用signal,那么系统就死锁了。

signal/signalAll方法本质上只是将条件等待队列中的节点转移到锁的同步队列中。因此,不能任务signal/signalAll方法调用后就会使得线程获取锁,线程什么时候获取锁,就是根据锁的同步队列FIFO的顺序来决定的,只有同步队列中的第一个线程才有权利去争夺获取锁。

后记

如果文章有错不吝指教 :)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏JAVA烂猪皮

使用并发工具实现 RPC 调用流量控制

RPC 服务中,每个服务的容量都是有限的,即资源有限,只能承受住给定的网络请求,所以,在设计 RPC 框架的时候,一定要考虑流量控制这个问题。而 Java 中,...

591
来自专栏我是攻城师

解读Java阻塞队列BlockingQueue的实现

上篇文章我们介绍了队列的基类接口Queue它定义了所有实现队列的类必须拥有的方法行为而BlockingQueue阻塞队列接口继承了Queue接口,此外Block...

1082
来自专栏流柯技术学院

Python多线程学习

1、  函数式:调用thread模块中的start_new_thread()函数来产生新线程。如下例:

661
来自专栏JAVA烂猪皮

JAVA多线程与并发学习总结

使用高速缓存来作为内存与处理器之间的缓冲,将运算需要用到的数据复制到缓存中,让计算能快速进行;当运算结束后再从缓存同步回内存之中,这样处理器就无需等待缓慢的内存...

411
来自专栏我是攻城师

理解AbstractQueuedSynchronizer提供的独占锁和共享锁语义

Doug Lea前辈在JDK5中编写的AbstractQueuedSynchronizer抽象同步框架非常精辟,整个代码里没有使用像synchronized这样...

982
来自专栏一个会写诗的程序员的博客

【Java 并发】 之 AQS 详解 & volatile关键字CPU内存架构volatile关键字的作用

谈到并发,不得不谈ReentrantLock;而谈到ReentrantLock,不得不谈AbstractQueuedSynchronizer(AQS)!

813
来自专栏古时的风筝

Java 开发, volatile 你必须了解一下

872
来自专栏Android 研究

Android Handler机制1之Thread

每一个进程都有自己的独立的一块内存空间、一组资源系统。其内部数据和状态都是完全独立的。进程的优点是提高CPU的运行效率,在同一个时间内执行多个程序,即并发执行。...

742
来自专栏程序猿DD

一文让你秒懂AQS,附带源码剖析!

AbstractQueuedSynchronizer,简称AQS,是Doug Lea大师创作的用来构建锁或者其他同步组件(信号量、事件等)的基础框架类。

863
来自专栏木木玲

CountDownLatch 源码浅析

1646

扫码关注云+社区