java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
/**
* waitStatus value to indicate thread has cancelled
* 表示线程获取锁的请求已经取消了
*/
static final int CANCELLED = 1;
/**
* waitStatus value to indicate successor's thread needs unparking
* 表示线程已经准备好了,就等资源释放了
*/
static final int SIGNAL = -1;
/**
* waitStatus value to indicate thread is waiting on condition
* 表示节点在等待队列中,节点线程等待唤醒
*/
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should unconditionally propagate
* 当前线程处在SHARED情况下,该字段才会使用
*/
static final int PROPAGATE = -3;
// 当前节点在队列中的状态
volatile int waitStatus;
// 链表的前一个节点
volatile Node prev;
// 链表的下一个节点
volatile Node next;
// 处于该节点的线程
volatile Thread thread;
// 指向下一个处于condition状态的节点
Node nextWaiter;
predecessor();
public void lock() {
sync.lock();
}
java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 在等待锁期间如果线程的中断标识为被设置为true,则设置当前线程的中断标识位为true
// todo 本身已经设置为true了,这里有必要再次设置吗???
selfInterrupt();
}
AQS中的模板方法NonfairSync与FairSync中重写了
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter
prev(previous) 前一个,当前节点的前一个节点(链表结构,物理结构)
pred(predecessor)符合一定条件的前驱节点(逻辑结构,有可能是链表的前一个节点,也有可能是链表上忽略的不满足条件的几个节点后的某一个节点)
Creates and enqueues node for current thread and given mode.
用当前线程和给定的模式创建节点,并将节点入队
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 尝试快捷入队;如果失败,则走完整的入队逻辑。
// 为什么说是快捷入队呢,因为完整的入队逻辑中还包含初始化首节点的逻辑
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
进入队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 创建空节点,表示当前正在持有锁的线程
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
设置前驱节点(predecessor)的waitStatus为-1(waitStatus value to indicate successor’s thread needs unparking 指示后继线程需要解除阻塞状态)
阻塞当前线程
返回等待锁期间当前线程的状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 尝试抢锁是因为非公平的情况下后来的线程在未入队前也会抢一下
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 自旋过程中,判断当前线程是否需要阻塞 && 阻塞当前线程并且检验线程中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
/**
* 阻塞当前线程,等待持有锁的线程唤醒
* 在获取锁期间线程的中断标识为是否被设置为了true
*/
parkAndCheckInterrupt()){
interrupted = true;
}
}
} finally {
if (failed){
cancelAcquire(node);
}
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
* 设置前一个节点的状态为就绪
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 线程在此阻塞,等待持有锁的线程释放锁唤醒
LockSupport.park(this);
return Thread.interrupted();
}
java.util.concurrent.locks.ReentrantLock#unlock
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0){
compareAndSetWaitStatus(node, ws, 0);
}
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev){
if (t.waitStatus <= 0){
s = t;
}
}
}
if (s != null){
LockSupport.unpark(s.thread);
}
}