原本是想继续进行Elasticsearch源码分析的,但这几天面试中有被问到AQS的源码部分,于是利用周末空闲时间对其中的源码进行了一个系统化的分析。首先我们应该知道 AQS是一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。AQS内部通过双向链表来维护节点对锁的竞争,基于CLH队列锁改进而来(CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。关于CLH锁与MCS锁的区别可以参考链接:https://mp.weixin.qq.com/s/WzO6DbVH6EApryGFC23aZA,这里需要注意的是AQS对CLH已经做了很大的改进)。AQS分为共享模式和独占模式,内部基于volatile内存可见性和CAS原子性操作实现线程间通信操作。本篇文章主要结合ReentrantLock的使用来对AQS的源码进行相应的分析。
先就jdk中的AQS的子类列举如下:
本篇我们主要分析一下ReentrantLock。
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); }
/** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
创建方式:
ReentrantLock reentrantLock = new ReentrantLock();
加锁方法
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.nonfairTryAcquire(1); }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
在该方法内部会调用非公平锁java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire和 acquireQueued和addWaiter方法,这些方法会在后面进行分析。
首先来看java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
非公平锁的实现相对简单,我们直接看代码:
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
我们主要看下java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire获取锁的方法,代码如下:
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ @ReservedStackAccess final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 获取当前状态 int c = getState(); // 如果是初始值 if (c == 0) { // cas设置state值 if (compareAndSetState(0, acquires)) { // 设置成功则将当前线程设置为锁的排他线程 setExclusiveOwnerThread(current); //返回成功 return true; } } // 如果当前线程就是排他锁的当前拥有者,允许它再次获取锁,并修改获取锁的次数state的值 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
如果当前排他锁还未被线程占有,则尝试对申请线程赋予锁的许可,并把当前线程设置成锁的拥有者。如果尝试获取锁的线程就是排他锁的当前拥有者,允许它再次获取锁,并修改获取锁的次数state的值。
这里对于非公平锁的lock方法,它内部调用的是AbstractQueuedSynchronizer#acquire方法,这个方法内部调用的是!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法,对于公平锁和非公平锁来说,只是tryAcquire方法实现不同,其他方法调用的都是父类Sync和AbstractQueuedSynchronizer中的方法。
AQS中的Node节点的几种模式:
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;
创建方法:
ReentrantLock reentrantLock = new ReentrantLock(true);
加锁方法:
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.nonfairTryAcquire(1); }
可以看出这里加锁的方法与非公平锁是相同的,只是两者的tryAcquire方法实现不同。
这里再来看一下java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire方法:
/** 以独占模式来获取锁 * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { //如果tryAcquire方法返回false才会进入第二步判断,如果tryAcquire的结果为true代表已经获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire方法:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 如果当前节点的先前节点不是头节点就进行cas操作state设置为acquires的值, // cas操作成功后将当前线程设置为锁的独占线程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 进入这个分支代表是一个线程第二次来获取锁的占有权 // 如果当前线程就是锁的独占线程时,则继续让该线程获取锁的占有权 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
在这里先插一段waitStatus的定义(注意:waitStatus也是volatile修饰):
/** waitStatus value to indicate thread has cancelled. */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking. */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition. */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate. */ static final int PROPAGATE = -3;
关于java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors 方法,对于这个方法在注释里描述的很清楚了,是用来查询是否有别的线程等待获取锁的操作比当前线程等待得更久。具体代码:
public final boolean hasQueuedPredecessors() { Node h, s; // 如果head节点为null,则返回false。否则往下进行。 if ((h = head) != null) { // 如果头节点的后置节点为null或者waitStatus为canceled,则从tail节点开始往向遍历,去除掉所有的处于canceled状态的节点 if ((s = h.next) == null || s.waitStatus > 0) { s = null; // traverse in case of concurrent cancellation for (Node p = tail; p != h && p != null; p = p.prev) { if (p.waitStatus <= 0) s = p;//最终s指向的是离头节点最近的那个节点 } } // 如果头节点的next节点不为null并且它的线程不是当前线程,则代表当前来竞争的节点也不是head节点的next节点。如果s==null则表明head节点的next节点为null,s.thread == Thread.currentThread()则表明head节点的next节点的自旋线程就当前节点获取许可对应的线程,这两种情况下都表明当前节点\线程的前一个节点\线程就是head,所以会返回最终的false,允许去做cas操作 if (s != null && s.thread != Thread.currentThread()) return true; } // 如果头节点为null,则表示当前节点可以去用cas竞争锁,故返回false,让tryAcquire中第一个if语句中的第二个条件函数得以执行 return false; }
在这里我们梳理一下整个初始化流程,在创建ReentrantLock实例之后,第一次调用acquire方法时会调用下面这段代码(为了讲清楚,请容忍一下重复代码):
public final void acquire(int arg) { //如果tryAcquire方法返回false才会进入第二步判断,如果tryAcquire的结果为true代表已经获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
我们来回过头看下FairSync的tryAcquire方法,它在第一次调用hasQueuedPredecessors()方法时,由于此时head节点为null,所以会直接返回false,然后它会进行的操作和非公平锁获取锁的方式相同。第一次的tryAcquire会返回true,那么在acquire方法里的!tryAcquire(arg)的结果为false,那么后面的&&操作不会进行判断,acquire方法结束。但是如果此时有另一个并发的线程也在调用acquire方法,进入判断中的tryAcquire方法时返回的结果为false,此时接着会进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,这一点在非公平锁中也是一样流程。
属性:
volatile int waitStatus;//用来描述状态的字段,头节点是不能处于cancel状态的,它的初始值为0,condition节点的状态为CONDITION
//指向当前节点/线程的前一个节点,当前节点依赖它来校验waitStatus。在入队列时赋值,在出队列时可能会被置为null,为了help GC.同时在一个pre节点处于cancel状态时,我们一直循环直到找到一个一直存在的不会cancel的节点,这样的节点一定会存在,因为head节点是不能处于cancel状态的。一个节点只有在成功获取到锁之后才会成为头节点。一个处于cancelled的状态的线程是不会获取锁成功的,并且一个线程只能cancel自己,对其他节点无法产生作用 volatile Node prev;
//用于指向当前拥有锁的节点/线程的后继节点,在当前节点释放锁时会被唤醒。在入队列时被赋值,在前置节点处于cancle状态和出队列置为null(help GC)时会进行调整。enq操作不会给pre节点的next属性赋值直到被赋值后,所以看到节点的next属性为null并不代表这个节点在队列的尾部。但是如果一个节点的next属性为null,我们可以从尾结点开始扫描所有的前置节点来double-check。处于cancelled状态的节点的next属性会指向它自己而不是null,这样可以让isOnSyncQueue方法更好操作一些。 volatile Node next;
// 执行当前节点入队列操作的线程。在构造方法中初始化并且在使用完之后置为null volatile Thread thread;
// 指向下一个在condition上等待的节点,或者是特殊值SHARED。因为condition队列只有在独占模式下能被进入,我们只需要一个简单的链表队列来保存处于各种条件上等待中的节点。它们会被重新转入队列中去重新进行acquire。并且因为conditions只能是独占的,所以我们保存一个属性通过使用特殊值来标明共享模式 Node nextWaiter;
// 当节点处于共享模式的等待状态时返回true final boolean isShared() { return nextWaiter == SHARED; }
构造方法:
/** Establishes initial head or SHARED marker. */ Node() {}
/** Constructor used by addWaiter. */ Node(Node nextWaiter) { this.nextWaiter = nextWaiter; THREAD.set(this, Thread.currentThread()); }
/** Constructor used by addConditionWaiter. */ Node(int waitStatus) { WAITSTATUS.set(this, waitStatus); THREAD.set(this, Thread.currentThread()); }
再回顾下AbstractQueuedSynchronizer的几个属性和初始值:
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head;
/** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail;
/** * The synchronization state. */ private volatile int state;
注意:这两个方法对于公平锁和非公平锁都是一样的
addWaiter方法:
看完上面的分析,再看addWaiter(Node.EXCLUSIVE)方法就很好理解了,代码如下:
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { // 创建一个指定mode的Node对象 Node node = new Node(mode);
for (;;) { // 用一个变量oldTail保存当前队列尾节点 Node oldTail = tail; if (oldTail != null) { // 尾节点不为空时,通过MethodHandles设置新创建的节点的pre节点为队列之前的尾节点 node.setPrevRelaxed(oldTail); // 为什么要用oldTail来保存tail,而不直接用tail?这样能让多个竞争线程在本地变量上进行自旋,而不用竞争同一个tail变量来进行cas自旋 // 这里会将当前aqs对象的tail指向node if (compareAndSetTail(oldTail, node)) { // cas成功的会将oldTail也就是当前aqs的tail 节点的next节点指向当前节点 // cas不成功的线程会继续当前for循环,直到线程对应的节点被添加到队列中 // 与上面的node.setPrevRelaxed(oldTail)相对应,是一个双向链表 oldTail.next = node; return node; } } else { // 当尾节点为null时会进行队列的初始化,初始化之后不会跳出循环,还会继续上面的流程,直到执行到return initializeSyncQueue(); } } }
第一次进入时,oldTail为null,会先进入initializeSyncQueue()方法,代码如下:
/** * Initializes head and tail fields on first contention. */ private final void initializeSyncQueue() { Node h; if (HEAD.compareAndSet(this, null, (h = new Node()))) tail = h; }
这个方法主要用来进行head和tail的第一次初始化操作。cas成功的线程会将tail和head节点都初始化为new Node。初始化之后会继续addWaiter(Node mode)方法的循环,直到线程对应的节点被添加到队列中。注意观察,这两步中head节点与当前线程节点没有产生关系,只是将tail节点的next节点指向了它,并将它的pre节点指向了tail节点。
到这里再来接着看java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued方法:
// 队列中已经存在的线程会以排他不可中断的模式进行acquire操作。被用于condition的wait方法和acquire方法 final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { // 获取节点的前置节点,如果是第一个来获取锁的线程节点,它的前置节点就是tail节点,而这时tail节点和head节点是一样的指向 final Node p = node.predecessor(); //如果当前节点的前置节点等于当前aqs对象的head节点,并且执行tryAcquire(arg)方法成功(代表获取许可成功) if (p == head && tryAcquire(arg)) { // 会将head节点设置为当前节点,关于setHead方法见下面,它会将头节点指向当前node,然后将node节点的thread和prev属性置为null,因为头节点就是拥有锁许可的那个 setHead(node); // 将p节点的next节点置为null,应该是在cms回收器中能防止浮动垃圾的产生 p.next = null; // help GC return interrupted; } // 否则会在获取失败后进行等待 if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } }
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
/** * * @param pred node's predecessor holding status 节点的前置节点 * @param node the node // 当前节点 * @return {@code true} if thread should block */ // 校验和更新acquire失败的节点的状态,如果线程需要阻塞则返回true。这是acquire循环中主要的信号控制。要求pred == node.prev private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前置节点的waitStatus,初始值为0 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ //只有canceled状态下这个状态值为>0 参考:static final int CANCELLED = 1; do { // 将node的前置节点指向前置节点的前置节点pred,直到这个pred的waitStatus不再处于cancel状态,可能会一直到head节点 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 在这中间相当于把node节点链中的canceled状态的节点都去掉了,最终将pred节点的next指向node节点 pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //到这个分支的waitStatus肯定是0或者PROPAGATE对应的-3。这标识着我们需要一个signal,但是不进行等待。调用者需要去重试来确保它在进入阻塞等待前不能够获得锁许可时才会进入阻塞等待。 pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false; }
/** * Convenience method to park and then check if interrupted. * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { // 阻塞等待的便捷方法 // 如果某线程A调用park,那么除非另外一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。unpark操作可以再park操作之前。也就是说,先提供许可。当某线程调用park时,已经有许可了,它就消费这个许可,然后可以继续运行。这其实是必须的。考虑最简单的生产者(Producer)消费者(Consumer)模型。但是这个“许可”是不能叠加的,“许可”是一次性的。详见:https://www.jianshu.com/p/e3afe8ab8364 LockSupport.park(this); // 检查线程是否中断 return Thread.interrupted(); }
如果线程没有被中断,这里在等待线程被唤醒后会返回false,然后interrupted 的值会变成true。被唤醒后的线程会重新进入acquireQueued方法中去尝试获取许可。
这里思考一下,什么时候这个线程会被唤醒呢?我们将会在接下来篇幅中进行分析。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这个方法对于公平锁与非公平锁来说就是tryAcquire方法的不同。
public boolean tryLock() { return sync.nonfairTryAcquire(1); }
逻辑与非公平锁的tryAcquire方法相同,它会直接尝试用cas去获取锁所有权,成功则返回true,否则返回false。
锁的释放对于公平锁和非公平锁来说也是一样的。
锁释放调用代码
public void unlock() { sync.release(1); }
release方法:
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ //用于排他模式的release操作,会在unlock时被调用 public final boolean release(int arg) { if (tryRelease(arg)) { // 当tryRelease成功 Node h = head; // 当头节点不为null,且waitStatus不为初始状态时 if (h != null && h.waitStatus != 0) // 唤醒离h节点最近的符合unparkSuccessor方法中条件的节点 unparkSuccessor(h); return true; } return false; }
锁释放的方法java.util.concurrent.locks.ReentrantLock.Sync#tryRelease:
@ReservedStackAccess protected final boolean tryRelease(int releases) { // 当前锁的状态值去掉要释放的锁的许可数 int c = getState() - releases; // 如果来释放锁的线程不是锁的拥有者则抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // c=0 表示锁已经全部释放 free = true; // 锁的拥有者置为null setExclusiveOwnerThread(null); } // 将当前state置为c setState(c); return free; }
释放锁的操作主要是重置state的值和当前排他锁的拥有者线程。
这里我们先来看下唤醒上面park状态线程的部分在代码中的调用主要地方java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor方法,我们来找一下这个方法被 调用的地方:
继续分析unparkSuccessor方法:
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) // 将waitStatus小于0节点的waitStatus置为0 node.compareAndSetWaitStatus(ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next;//获取当前节点的next节点 if (s == null || s.waitStatus > 0) {// s为空表示node为tail节点;s.waitStatus > 0表示s处于canceled状态 s = null; // 从tail节点开始向前遍历,一直到找到排在最前面没有处于canceled状态的节点,并将它赋值给s for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) // 如果s.thread已经处于block状态,这个方法会唤醒s.thread;否则它的作用是下一次s.thread调用LockSupport.park(s.thread)时将不会阻塞 LockSupport.unpark(s.thread); }
doReleaseShared方法在Semaphore和CountDownLatch和ReentrantReadWriteLock中有使用,release方法上面已经分析过,这里篇幅问题我们主要分析一下cancelAcquire方法:
/** * Cancels an ongoing attempt to acquire. * * @param node the node */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; // 既然取消了就不需要线程对象再去获取许可了 node.thread = null;
// Skip cancelled predecessors // 跳过已经处于canceled状态的前置节点,和shouldParkAfterFailedAcquire方法中类似 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary, although with // a possibility that a cancelled node may transiently remain // reachable. //predNext节点是很明显需要取消连接的节点。如果不这样做,下面的所有CAS操作都会失败,这样,我们竞争不过其它的cancel或者是signal操作,因此无需采取进一步的措施,尽管处于canceled模式的节点暂时处于可达状态。 Node predNext = pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. //这里无条件地直接将waitStatus状态置为CANCELLED,而不是通过cas。完成这一基本步骤后,其他节点可以跳过我们。在些之前,我们不受其他线程的干扰 node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves. // 如果当前节点是tail节点,则移除自己,并将它的前置节点设置为tail,这时它的前置节点的next节点置为null(因为要移除的它自已是尾结点了,它的前置节点的后置节点自然就为null了) if (node == tail && compareAndSetTail(node, pred)) { pred.compareAndSetNext(predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; // 进入这里表示node节点不是tail节点,那么它的next节点就不为null // 如果它的前置节点不是头节点,并且前置节点的waitStatus处于SIGNAL状态或者 // ws<=0表示节点waitStatus值为SIGNAL = -1或者初始值或者CONDITION = -2或者PROPAGATE = -3,此时将此前置节点的waitStatus设置为SIGNAL状态 // 并且当前前置节点的线程不为null if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) { // 获取到当前节点的next节点(因为不是tail节点,所以是有next节点的) Node next = node.next;
// 在next != null并且next.waitStatus值为SIGNAL = -1或者初始值或者CONDITION = -2或者PROPAGATE = -3时(也就是不为canceled状态时)将pred节点的next指向当前节点的next节点。 if (next != null && next.waitStatus <= 0) pred.compareAndSetNext(predNext, next); } else { // 如果当前节点为head节点,那么证明它是当前锁的拥有者 unparkSuccessor(node); }
node.next = node; // help GC } }
注:java.util.concurrent.CyclicBarrier也是在ReentrantLock基础上实现的,与ReentrantLock相关的Condition,将在下一篇中进行分析。