ReentrantLock是 java提供代码层面的锁,和synchronized关键字相同。为什么在用提供了 synchronized关键字后,还提供了ReentrantLock等代码层面的锁API,首先在synchronized关键字刚推出是,性能方面差很多,直到后面的版本中推出了锁升级的概念,在性能上有所好转。更重要的也是,JUC的包里面,提供的API更加灵活,符合生产环境各种需求。
分析 ReentrantLock 有如下几个特点: 1、互斥 2、重入 3、等待唤醒 4、存储一系列的等待线程 FIFO 先进先出 5、公平/非公平
public class ReenTrantLockDemo extends Thread{
//模拟优惠卷
private static List<Integer> array = new ArrayList<>();
private static Lock lock = new ReentrantLock();
public Integer get(){
lock.lock();
try {
Integer o= array.get(0);
array.remove(o);
return o;
}catch (Exception e){
System.out.println("获取出错");
}finally {
lock.unlock();
}
return -1;
}
@Override
public void run() {
Integer a = get();
System.out.println("获取到的优惠卷编号为"+a);
}
public static void main(String[] args) {
for(int i=0;i<10;i++){
array.add(i);
}
ReenTrantLockDemo reenTrantLockDemo = new ReenTrantLockDemo();
ExecutorService service = Executors.newCachedThreadPool();
ReenTrantLockDemo demo = new ReenTrantLockDemo();
for (int i = 0; i < 10; i++) {
service.submit(demo);
}
service.shutdown();
}
}
结果为
获取到的优惠卷编号为3
获取到的优惠卷编号为2
获取到的优惠卷编号为6
获取到的优惠卷编号为5
获取到的优惠卷编号为1
获取到的优惠卷编号为0
获取到的优惠卷编号为4
获取到的优惠卷编号为8
获取到的优惠卷编号为7
获取到的优惠卷编号为9
获取没有出错,每一个都有获取到正确的编号,不重复
本篇分析源码的思路为 构造函数 -> lock/unlock -> condition(await-signal) 尽量简化的方式来介绍
// 不带参数的,默认创建非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 根据参数创建非公平和公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
这里的FairSync 是什么,看一下类关系图:
Sync 继承了AQS,抽象同步队列。在JUC的包里面,AQS可以说是一个爷爷级别的存在,会有很多类,依赖AQS去实现。这里有AQS中,有2个重要关注的点来看 1、java.util.concurrent.locks.AbstractQueuedSynchronizer#state 2、java.util.concurrent.locks.AbstractQueuedSynchronizer.Node 3、java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
要完成互斥的功能,需要一个共同去抢占的值,这里采用了AQS中的state,当线程获取锁时,成功修改state值的线程,获得了锁,可以执行程序,此时也会记录当前线程,在此线程内,需要再次获得这个锁的时候,就避免了再次抢占的操作。故如下
当然在修改state时,使用了cas修改,保证原子性
当一个线程获取锁时,其他线程处于等待状态,需要这样的容器来存储这些数据,所有有了Node,一个Node代表一个线程(除头结点外) Node属性
其中waitStatus较为复杂,其他是双向链表的标准属性。
// 取消
static final int CANCELLED = 1;
// 等待被唤醒 也就是等待触发状态,前节点可能是head或者前节点为取消状态CANCELLED
static final int SIGNAL = -1;
// 等待条件状态,在等待队列中
static final int CONDITION = -2;
// 向后传播,贡献模式下使用
static final int PROPAGATE = -3;
这里使用非公平锁来进行探讨 获取锁的代码如下:
final void lock() {
// cas 修改state的值
if (compareAndSetState(0, 1))
// 记录当前获取锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
//获得锁 当抢夺锁失败后,进入这里
acquire(1);
}
cas修改值,修改成功,记录当前的线程,这个都好理解。难以理解的是acquire(1)
,当看这一部分源码时,要带入一个场景,就是锁已经被一个线程抢占了,未抢占的线程才会进入下面的方法中。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 执行到此处停止了
selfInterrupt();
}
分为如下3个部分来看:
// 尝试获取锁
tryAcquire(arg) A
acquireQueued() B
addWaiter(Node.EXCLUSIVE) C
先从到第一个方法来看 由于案例都是非公平锁,所以源码如下
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 此时上一个线程刚好释放
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 同一个线程需要获取锁,重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
刚好执行到了此处,上一个线程释放了锁。或者,当前线程再次获取锁,属于重入的情况。只有这2种,情况下,执行到A处,就返回了,不会继续往下。
假如继续往下走,会先加入同步队列节点:
addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 尾插法
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 把node加入队列中
enq(node);
return node;
}
// 讲node插入队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 加入一个空节点,作为头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在抢占锁失败后,需要加入同步队列,双向链表,节点是上面提过的Node。在空队列情况下,再插入第一个节点时,会先创建一个空节点作为头节点。 分为2种情况如下,
可以看到在尾部插入节点时,都是通过,先改变prev指针,再CAS改变tail节点。保证了
node.prev = t;
compareAndSetTail(t, node)
t.next = node;
这里会有疑问,为什么不是先改变next指针,如果先改变了next指针,cas后,还需要再次设置一次next,同时在通过head节点 往next查找时,会寻找到错误的节点。 当然在后面的源码中,会发现经常使用tail和prev指针来寻找。
获取队列
// 这里的Node是刚刚addWaiter里面加入的新的node节点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中断标志位
boolean interrupted = false;
// 自旋操作 使用自旋不停的去尝试获取锁
for (;;) {
// 获取前置结点
final Node p = node.predecessor();
// 如果前置节点就是头结点,则尝试获取锁资源
if (p == head && tryAcquire(arg)) {
// 设置头结点 头结点就表示当前正占有锁资源的节点
setHead(node);
// 把原来的头结点减少引用,帮助进入GC被回收
p.next = null; // help GC
failed = false;
// 向上传递是否中断标志
return interrupted;
}
// 字面意思,在尝试获取锁失败后,是否应该挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如上面注释的解释,再加入同步队列后,线程自旋去抢占资源,再适当时挂起。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前置节点的waitStatus是Node.SIGNAL则返回true,然后会执行parkAndCheckInterrupt()方法进行挂起
return true;
if (ws > 0) {
//大于0,都是 取消状态
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//这里我们由当前节点的前置节点开始,一直向前找最近的一个没有被取消的节点 ,把当前节点,放在找出来的节点后面
pred.next = node;
} else {
//根据waitStatus的取值限定,这里waitStatus的值只能是0或者PROPAGATE,那么我们把前置节点的waitStatus设为Node.SIGNAL然后重新进入该方法进行判断
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
改变前一个节点的状态为SIGNAL,然后自己挂起。当确定要挂起时,调用parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//被唤醒之后,返回中断标记,即如果是正常唤醒则返回false,如果是由于中断醒来,就返回true
return Thread.interrupted();
}
这里会传递线程中断标志到最上层,也就是,判断当前线程释放需要中断。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 执行到此处中断线程了
selfInterrupt();
}
如果有异常,则会调用cancelAcquire
自此,整个lock源码流程结束。
unlock 会简单很多,因为执行unlock是在单线程环境下执行的。如下
public void unlock() {
sync.release(1);
}
// tryRelease释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 队列中有正在等待的节点
if (h != null && h.waitStatus != 0)
//唤醒头结点的下一个节点
unparkSuccessor(h);
return true;
}
return false;
}
// 释放锁,由于重入的特性,当state为0时,才会去释放当前线程持有
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
至此分析了,lock()和unlock() 方法,也提到了同步队列。后续还会带来锁中的等待队列