https://zhuanlan.zhihu.com/p/249147493
ReentrantLock底层使用了CAS+AQS队列实现
CAS是一种无锁算法。有3个操作数:内存值V、旧的预期值A、要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
do{
备份旧数据;
基于旧数据构造新数据;
}while(!CAS( 内存地址,备份的旧数据,新数据 ))
该操作是一个原子操作,被广泛的应用在Java的底层实现中。
在Java中,CAS主要是由sun.misc.Unsafe这个类通过JNI调用CPU底层指令实现。
CAS速度非常快:
CAS仍然可能有消耗:可能出现cache miss的情况,会有更大的cpu时间消耗。
AQS是一个用于构建锁和同步容器的框架。
AQS使用一个FIFO的队列(也叫CLH队列,是CLH锁的一种变形),表示排队等待锁的线程。队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus。结构如下图所示:
首先ReentrantLock默认是非公平锁,也可以指定为公平锁
ReentrantLock的2个构造函数
public ReentrantLock() {
sync = new NonfairSync(); //默认,非公平
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync(); //根据参数创建
}
①ReentrantLock先通过CAS尝试获取锁,
②如果此时锁已经被占用,该线程加入AQS队列并wait();
③当前驱线程的锁被释放,挂在CLH队列为首的线程就会被notify(),然后继续CAS尝试获取锁,
此时:
非公平锁,如果有其他线程尝试lock(),有可能被其他刚好申请锁的线程抢占。
公平锁,只有在CLH队列头的线程才可以获取锁,新来的线程只能插入到队尾。
ReentrantLock中的公平锁和非公平锁的实现,其中tryAcquire是公平锁和非公平锁实现的区别,下面的两种类型的锁的tryAcquire的实现,从中我们可以看出在公平锁中,每一次的tryAcquire都会检查CLH队列中是否仍有前驱的元素,如果仍然有那么继续等待,通过这种方式来保证先来先服务的原则;而非公平锁,首先是检查并设置锁的状态,这种方式会出现即使队列中有等待的线程,但是新的线程仍然会与排队线程中的对头线程竞争(但是排队的线程是先来先服务的),所以新的线程可能会抢占已经在排队的线程的锁,这样就无法保证先来先服务,但是已经等待的线程们是仍然保证先来先服务的,所以总结一下公平锁和非公平锁的区别:
1、公平锁能保证:老的线程排队使用锁,新线程仍然排队使用锁。 2、非公平锁保证:老的线程排队使用锁;但是无法保证新线程抢占已经在排队的线程的锁。
直接调用CAS。如果成功通过CAS修改了state,指定当前线程为该锁的独占线程,标志自己成功获取锁。
如果CAS失败的话,调用acquire();
final void lock() { //非公平锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
final void lock() { //公平锁
acquire(1);
}
调用acquire()函数:
首先,调用tryAcquire(),会尝试再次通过CAS修改state为1。
如果失败而且发现锁是被当前线程占用的,就执行重入(state++);
如果锁是被其他线程占有,那么当前线程执行tryAcquire返回失败,并且执行addWaiter()进入等待队列,并挂起自己interrupt()。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里需要注意,公平锁和非公平锁的区别所在:调用lock()函数的时候,
非公平直接采用cas去获取锁,如果失败再去调用acquire(1),acquire(1)再调用tryAcquire();
公平锁则会先调用acquire(1),acquire(1)再调用tryAcquire()。
公平锁的tryAcquire
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// !hasQueuedPredecessors()保证了不论是新的线程还是已经排队的线程都顺序使用锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平锁的
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 新的线程可能抢占已经排队的线程的锁的使用权
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
检查state字段?(state字段看做是每一个ReentrantLock的状态,若为0则不自由(被线程持有);否则自由)
若为0,表示锁未被占用。然后通过 !hasQueuedPredecessors()先判断自己是不是队头的节点(或者说判断自己是否需要排队)?如果不需要排队,再调用compareAndSetState(0, acquires)来通过cas去获取锁;
若不为0,检查当前锁是否被自己占用,若被自己占用,则更新state字段,表示重入锁的次数。
如果以上两点都没有成功,则获取锁失败,返回false。
protected final boolean tryAcquire(int acquires) { //注意:这是公平的tryAcquire()
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//需要先判断自己是不是队头的节点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
final boolean tryAcquire(int acquires) { //注意:这是非公平的tryAcquire()
//获取当前线程
final Thread current = Thread.currentThread();
//获取state变量值
int c = getState();
if (c == 0) { //没有线程占用锁
//没有 !hasQueuedPredecessors() 判断,不考虑自己是不是在队头,直接申请锁
if (compareAndSetState(0, acquires)) {
//占用锁成功,设置独占线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) { //当前线程已经占用该锁
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 更新state值为新的重入次数
setState(nextc);
return true;
}
//获取锁失败
return false;
这里需要注意,公平锁和非公平锁的区别:
虽然他们的acquire(1)都调用tryAcquire(),但是公平锁的tryAcquire()与非公平锁的tryAcquire()不同,非公平锁没有 !hasQueuedPredecessors() 判断,不考虑自己是不是在队头,直接申请锁。
当前线程加入AQS双向链表队列。
写入之前需要将当前线程包装为一个 Node 对象(addWaiter(Node.EXCLUSIVE))。
首先判断队列是否为空,不为空时则将封装好的 Node 利用 CAS 写入队尾;
如果队列为空(尾结点为null)或者出现并发写入失败的话,就需要调用 enq(node) 来写入了。
/**
* 将新节点和当前线程关联并且入队列
* @param mode 独占/共享
* @return 新节点
*/
private Node addWaiter(Node mode) {
//初始化节点,设置关联线程和模式(独占 or 共享)
Node node = new Node(Thread.currentThread(), mode);
// 获取尾节点引用
Node pred = tail;
// 尾节点不为空,说明队列已经初始化过
if (pred != null) {
node.prev = pred;
// 设置新节点为尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq()的处理逻辑就相当于自旋
加上CAS
保证一定能写入队列。
写入队列后,需要挂起当前线程
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先会根据 node.predecessor() 获取到上一个节点是否为头节点,如果是则尝试获取一次锁,获取成功就万事大吉了。
如果不是头节点,或者获取锁失败,则会根据上一个节点的 waitStatus 状态来处理(shouldParkAfterFailedAcquire(p, node))。
waitStatus 用于记录当前节点的状态,如节点取消、节点等待等。
shouldParkAfterFailedAcquire(p, node) 返回当前线程是否需要挂起,如果需要则调用 parkAndCheckInterrupt():
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
他是利用 LockSupport
的 part
方法来挂起当前线程的,直到被唤醒。
释放时候,state--,通过state==0判断锁是否完全被释放。
成功释放锁的话,唤起一个被挂起的线程
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒被挂起的线程
unparkSuccessor(h);
return true;
}
return false;
}
//尝试释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
参考:https://blog.csdn.net/java_lyvee/article/details/98966684
队列存储内容:Node节点
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
交替执行:单个线程在获取锁--其实和队列无关,因为通过jdk级别(cas)就可以解决同步问题
竞争执行:多个线程再竞争同一把锁--需要用到aqs队列。
说到AQS联想到:{
自旋
park与unpark
cas
}.
t1占用锁还未释放后,t2的竞争流程:
注意点:AQS的对头head只想的节点Node,其属性Thread一定为空。
调用addWaiter方法
维护链表时,初始化时先初始化一个Thread属性为空的Node,设置为AQS的头和尾部;
然后通过自旋+CAS将t2插入队尾,修改tail指向为t2。
然后调用acquireQueued方法。
首先判断自己的上一个节点是不是头部节点,如果是再调用tryAcquire去尝试获取锁。
如果获取成功就返回并设置当前线程为持有锁的线程。
如果失败,就调用shouldParkAfterFailedAcquire(p, node)方法,前一个节点的waitStatus设为-1,并且返回false;
然后循环调用,再次自旋获取一个锁,然后继续调用shouldParkAfterFailedAcquire(p, node)方法,如果前一个节点为-1则返回true。(这里需要注意的是,在入队过程中还是会自旋两次去拿锁,分别是addWaiter和acquireQueued中的自旋获取锁)
然后调用parkAndCheckInterrupt方法。
利用 LockSupport
的 part
方法来挂起当前线程的,直到被唤醒。
扩充:reentrantlock.lockInterruptbly()
这个锁方法,比原来的lock方法多了一个用处,就是当t1拿到锁微释放时,t2在队列等待挂起,在主线程中可以使用t2.interrupt()方法去打断t2,不让其一直挂起,做出响应。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。