AbstractQueuedSynchronizer 就是那个大名鼎鼎的 AQS,是java.util.concurrent包下同步器的核心。
使用队列的方式来解决n个线程来争夺m把锁的问题,每当一个新的线程需要获取锁,为其创建一个节点并放到队尾,如果该线程是队列中的第一个节点,则节点的locked设置成false,如果它不是队列的第一个节点,则它的节点的prev指向原来的队尾节点,并不断自旋查看prev指向节点的locked属性,如果该值变为false,表示轮到它来尝试获取锁了,如果获取成功并最终用完释放后,则将自己的locked设置成false,如果获取失败,locked值不变,还是true,并不断尝试获取锁。
也就是说,每个节点只需要关心前置节点的locked状态,可以发现CLH实现锁的获取是公平的。
Node节点维护了双向节点和当前节点状态和线程引用。
static final class Node {volatile Node prev;volatile Node next;volatile Thread thread;volatile int waitStatus; // SIGNAL CANCELLED CONDITION PROPAGATECANCELLED,值为1,表示当前的线程被取消;
SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
值为0,表示当前节点在sync队列中,等待着获取锁。
}
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
需要在锁定时,需要维护一个状态(int类型),而对状态的操作是原子和非阻塞的,通过同步器提供的对状态访问的方法对状态进行操纵,并基于Unsafe的原子操作来修改state的状态,compareAndSet来确保原子性的修改。
获取/设置当前的同步状态:
protected final int getState() { return state;
}protected final void setState(int newState) {
state = newState;
}
对同步状态对修改采用原子操作:
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
入队操作,如果队列为空则实例化节点,否则插入队尾:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure
Node pred = tail; if (pred != null) {
node.prev = pred; if (compareAndSetTail(pred, node)) {
pred.next = node; return node;
}
}
enq(node); return node;
}
获取锁: 需要获取当前节点的前驱节点,并且头结点能够获取状态,代表能够占有锁。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false; return interrupted;
} if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally { if (failed)
cancelAcquire(node);
}
}
这个方法在非公平实现中,主要是通过AQS的state来检查和维护锁状态,如果state是0,说明没有线程占有这个锁,如果不为0并且锁的占有线程是当前线程,则是重入的情况,均可以获得锁并修改state值。
如果是首次获得锁,则设置锁占有线程为当前线程。
当然,如果前面两种情况都不满足,说明尝试获得锁失败,需要做前面段落所述的队列操作,创建一个等待结点并进入循环,循环中的park()调用挂起当前线程。
否则将当前线程挂起:
LockSupport.park最终把线程交给系统(Linux)内核进行阻塞。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); return Thread.interrupted();
}
public final boolean release(int arg) { if (tryRelease(arg)) {
Node h = head; if (h != null && h.waitStatus != 0)
unparkSuccessor(h); return true;
} return false;
}
如果修改state值成功,则找到队列中应该唤起的结点,对节点中的线程调用unpark()方法,恢复线程执行。