本文为何适原创并发编程系列第 15 篇,文末有本系列文章汇总。
AQS是java.util.concurrent
包的核心基础组件,是实现Lock的基础。那么AQS是如何实现Lock的呢?
ReentrantLock是Lock中用到最多的,与synchronized具有相同的功能和内存语义,本文将从源码角度深入分析AQS是如何实现ReentrantLock的。
注:本文是在默认理解AQS原理基础上分析ReentrantLock的,建议读者先读懂上一篇AQS原理。
用法:
public class ReentrantLockDemo {
private static ReentrantLock reentrantLock = new ReentrantLock();
public void createOrder() {
reentrantLock.lock();// 获取锁
try {
// 同步代码
} finally {
reentrantLock.unlock();// 释放锁
}
}
}
需要注意两点:
使用举例:
synchronized文章中讲到的线程安全问题,代码如下:
public class ReentrantLockTest {
public int inc = 0;
public void increase() {
inc++;
}
public static void main(String[] args) {
final ReentrantLockTest test = new ReentrantLockTest();
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
for (int j = 0; j < 1000; j++)
test.increase();
};
}.start();
}
while (Thread.activeCount() > 1)
// 保证前面的线程都执行完
Thread.yield();
System.out.println(test.inc);
}
}
目的是得到test.inc
=10000,但是因为线程安全问题,最终的结果总是小于10000。
使用synchronized解决办法是,用synchronized修饰increase()方法。同样可以使用重入锁解决,代码如下:
public class ReentrantLockTest {
private ReentrantLock reentrantLock = new ReentrantLock();
public int inc = 0;
public void increase() {
reentrantLock.lock();// 加锁
inc++;
reentrantLock.unlock();// 解锁
}
public static void main(String[] args) {
final ReentrantLockTest test = new ReentrantLockTest();
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
for (int j = 0; j < 1000; j++)
test.increase();
};
}.start();
}
while (Thread.activeCount() > 1)
// 保证前面的线程都执行完
Thread.yield();
System.out.println(test.inc);
}
}
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class FairSync extends Sync {}
static final class NonfairSync extends Sync {}
}
ReentrantLock用内部类Sync来管理锁,所以真正的获取锁和释放锁是由Sync的实现类来控制的。
Sync有两个实现,分别为NonfairSync(非公平锁)和FairSync(公平锁),以FairSync为例来讲解ReentrantLock,之后会专门分析公平锁和非公平锁。
ReentrantLock分为公平锁和非公平锁,本文以公平锁为例讲解,下一篇将详细介绍公平锁与非公平锁。本文的源码讲解方式依然是在代码中适当位置加入注释。
/**
* 获取锁reentrantLock.lock()-->ReentrantLock.lock()
*/
public void lock() {
sync.lock();
}
/**
* ReentrantLock.lock()-->ReentrantLock.FairSync.lock()
*/
final void lock() {
acquire(1);
}
/**
* ReentrantLock.FairSync.lock()-->AbstractQueuedSynchronizer.acquire(int)
* 很熟悉了吧,上一篇讲的AQS获取锁的方法
* 1.当前线程通过tryAcquire()方法抢锁
* 2.线程抢到锁,tryAcquire()返回true,完成。
* 3.线程没有抢到锁,将当前线程封装成node加入同步队列,并将当前线程挂起,等待被唤醒之后再抢锁。
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* ReentrantLock.FairSync.tryAcquire(int)
* 实现了AQS的抢锁方法,抢锁成功返回true
* 获取锁成功的两种情况:
* 1.没有线程占用锁,且AQS队列中没有其他线程等锁,且CAS修改state成功。
* 2.锁已经被当前线程持有,直接重入。
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();// AQS的state (FairSync extends Sync extends AQS)
if (c == 0) {// state==0表示当前没有线程占用锁
if (!hasQueuedPredecessors() && // AQS同步队列中没有其他线程等锁的话,当前线程可以去抢锁,此方法下文有详解
compareAndSetState(0, acquires)) {// CAS修改state,修改成功表示获取到了锁
setExclusiveOwnerThread(current);// 抢锁成功将AQS.exclusiveOwnerThread置为当前线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
/*
* AQS.exclusiveOwnerThread是当前线程,表示锁已经被当前线程持有,这里是锁重入
* 重入一次将AQS.state加1
*/
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* AbstractQueuedSynchronizer.hasQueuedPredecessors()
* 判断AQS同步队列中是否还有其他线程在等锁
* 返回true表示当前线程不能抢锁,需要到同步队列中排队;返回false表示当前线程可以去抢锁
* 三种情况:
* 1.队列为空不需要排队, head==tail,直接返回false
* 2.head后继节点的线程是当前线程,就算排队也轮到当前线程去抢锁了,返回false
* 3.其他情况都返回true,不允许抢锁
*/
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t && // head==tail时队列是空的,直接返回false
((s = h.next) == null || s.thread != Thread.currentThread());// head后继节点的线程是当前线程,返回false
}
sync.state>0
且sync.exclusiveOwnerThread==当前线程
。四、释放锁
/**
* 释放锁reentrantLock.unlock()-->ReentrantLock.unlock()
*/
public void unlock() {
sync.release(1);
}
/**
* ReentrantLock.unlock()-->AbstractQueuedSynchronizer.release(int)
* 同样是上一篇AQS中的释放锁方法
* 释放锁成功之后,唤醒head的后继节点next,next节点被唤醒后再去抢锁。
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* AbstractQueuedSynchronizer.release(int)-->ReentrantLock.Sync.tryRelease(int)
* 释放重入锁。只有锁彻底释放,其他线程可以来竞争锁才返回true
* 锁可以重入,state记录锁的重入次数,所以state可以大于1
* 每执行一次tryRelease()将state减1,直到state==0,表示当前线程彻底把锁释放
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
state用于记录线程状态:state==0,没有线程占用该锁;state==1,一个线程持有该锁;state==n,一个线程持有该锁且重入了n次。
重入锁实现重入过程
重入锁实现同步过程:
线程最终获取到锁的标志就是AQS.state>0
且AQS.exclusiveOwnerThread==当前线程
。
Lock和AQS很好的隔离了使用者和实现者所需关注的领域。