通过lock
与unlock
研究ReentrantLock相关实现的原理 :
state
变量标志当前资源占用ExclusiveThread
标志当前占用CPU的线程LockSupport.park/unpark
来申请和释放CPU资源Unsafe.compareAndSwapObject
来完成CAS操作Unsafe.putObject
来完成原子操作state
变量 state
(0-->1) , 如果设置成功 , 则独占该锁addWaiter
创建本线程的Node , 插入队列中park
来让出当前线程的CPUReentrantLock.lock()
时 // 以非公平锁为例
static final class NonfairSync extends Sync {
// 获取锁
final void lock() {
// 通过CAS操作设置state的状态
if (compareAndSetState(0, 1))
// 如果设置成功的话 , 则代表当前独占锁的线程为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 尝试获取锁
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
// 重写子类的Acquire , 调用nonfairTryAcquire
return nonfairTryAcquire(acquires);
}
}
acquire
中 , 会获取锁 , 或者插入等待队列中public final void acquire(int arg) {
// tryAcquire尝试获取锁 , 如果获取失败的话 ,
// 则通过acquireQueued将当前线程添加到队列中
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果成功的话 , 则中断当前线程 , 仅仅只是中断
selfInterrupt();
}
tryAcquire
中 , 会尝试获取锁 final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取当前State状态
int c = getState();
if (c == 0) {
// 如果当前State为0的话 , 则独占该锁 ,并且返回true,不需要插入队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 如果当前线程就是独占的线程的话 , 则将State加一 , 并且返回true
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter(Node.EXCLUSIVE)
创建Node , 由于是通过Unsafe.putObject
, 所以也是保证线程间的可见性的. private Node addWaiter(Node mode) {
// 通过构造函数 , 将Node.EXCLUSIVE作为nextWaiter放入该节点
Node node = new Node(mode);
for (;;) {
// 找到尾节点 , head/tail都是volatile的
Node oldTail = tail;
// 尾节点不为空 , 双线链表增加尾节点操作
if (oldTail != null) {
// 将tail节点放到node.prev中
U.putObject(node, Node.PREV, oldTail);
// 通过cas操作 , 设置tail
if (compareAndSetTail(oldTail, node)) {
// 如果设置成功的话 , 则将tail.next设置成该节点
oldTail.next = node;
// 返回node
return node;
}
} else {
// 如果没有头尾节点 , 则创建head/tail节点
initializeSyncQueue();
}
}
}
shouldParkAfterFailedAcquire
返回true , 则会Park当前线程shouldParkAfterFailedAcquire
返回false , 就会一直自旋遍历前驱节点 , 直到前驱接节点为空 , 则将当前节点设置为head节点 , 可以获取当前锁final boolean acquireQueued(final Node node, int arg) {
// 其实node就是尾节点
try {
boolean interrupted = false;
for (;;) {
// 从尾节点往前遍历 , 获取其前驱
final Node p = node.predecessor();
// 如果遍历到了head节点 , 并且获取成功了, 则返回false
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
// shouldParkAfterFailedAcquire:判断在acquire后,是否应该park让出CPU
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
shouldParkAfterFailedAcquire
主要根据前驱节点状态判断当前节点是否需要Park让出CPUNode.SIGNAL
的话 , 则返回true , park当前线程PROPAGATE
的话 , 则将前驱节点的ws设置成SIGNAL private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前驱节点的waitStatus为SIGNAL,就代表前驱节点可以被唤醒
// 返回true , park当前线程
return true;
if (ws > 0) {
// 如果ws>0, 则代表前驱节点cancelled了 , 则过滤掉canceld节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 删除掉cancedld节点
pred.next = node;
} else {
// 如果ws为0或者PROPAGATE的话 , 则将前驱节点的waitStatus设置成SIGNAL , 但是先不park
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
当某个线程在使用完锁后 , 使用unlock
来释放锁资源.
unlock
也就是释放1
的资源public void unlock() {
sync.release(1);
}
release
会先尝试释放锁 , 如果成功 ,则unpark
前驱线程public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
Sync
中的tryRelease
中 , 会释放相关资源 , 并且返回锁是否释放成功protected final boolean tryRelease(int releases) {
// 将当前申请资源数 - 要释放的数量
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
// 如果当前线程不是独占线程的话 , 则抛出异常
throw new IllegalMonitorStateException();
// 是否释放锁
boolean free = false;
if (c == 0) {
// 如果当前没有资源占用的话 , 则认为可以释放锁
free = true;
// 将独占线程设为空
setExclusiveOwnerThread(null);
}
// 设置当前锁所对应的资源数量
setState(c);
// 返回当前锁是否会被释放
return free;
waitStatus
是否不为0 , 由于lock
阶段会清除所有的Cancel
节点 , 所以此处判断不为0 , 则代表前驱节点可以获取资源占有锁. 于是 , 调用unparkSuccessor
让前驱节点获取节点private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 如果waitStatus小于0 ,
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
// 开始寻找可执行的前驱节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
// unpark对应的线程
LockSupport.unpark(s.thread);
}
对于公平锁(FairSync
)而言在加锁的过程中会有所不同 , 仅仅只是在申请锁的时候 , 加入了队列的判断 , 如果头节点有后继节点的话 , 则让后继节点获取CPU
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 判断队列中是否有后继节点
if (!hasQueuedPredecessors() &&
// 如果没有前驱的话 , 则会通过cas来设置state
compareAndSetState(0, acquires)) {
// 设置当前线程独占该锁
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 返回头尾节点不相等 , 并且头节点的next不为空 , 头节点的线程不是当前线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}