接下来几篇文章会对JUC并发包里面的锁工具类做下梳理,如:ReentrantLock、
ReentrantReadWriteLock、StampedLock
Lock 用于解决互斥问题,即同一时刻只允许一个线程访问共享资源;Condition 用于解决同步问题(即线程之间如何通信、协作)
ReentrantLock 类结构:
class Sync extends AbstractQueuedSynchronizer
class NonfairSync extends Sync
class FairSync extends Sync
AbstractQueuedSynchronizer 类结构:
Condition 类结构:
当使用Lock来保证线程同步时,需使用Condition条件变量来使线程保持协调。Condition实例被绑定在一个Lock的对象上,使用Lock对象的方法newCondition()获取Condition的实例。Condition提供了下面三种方法,来协调不同线程的同步:
synchronized与ReentrantLock比较:
java.util.concurrent.locks.ReentrantLock#lock
public void lock() {
// sync,是构造器传入的,支持非公平模式、公平模式
sync.lock();
}
java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
final void lock() {
// 借助 unsafe 原子性对 state 加1。如果初始值为0,表示没有线程占有锁
if (compareAndSetState(0, 1))
// 设置当前线程独占锁
setExclusiveOwnerThread(Thread.currentThread());
else
// 尝试获取锁
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取当前 AQS 内部状态量
int c = getState();
// 0 表示无线程占有,直接用 CAS 修改
if (c == 0) {
// 不检查排队情况,直接争抢
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入锁情况
else if (current == getExclusiveOwnerThread()) {
// state 计数增加
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果前面的 tryAcquire 失败,代表着锁争抢失败,进入排队竞争阶段
// 当前线程被包装成EXCLUSIVE排他模式的节点,通过addWaiter方法添加到队列中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 无限循环
for (;;) {
// 当前节点的前一个节点
final Node p = node.predecessor();
// 如果前一个节点是头结点,表示当前节点适合去 tryAcquire
if (p == head && tryAcquire(arg)) {
// if 获取成功,设置当前节点为头节点,出队列
setHead(node);
// 将前面节点对当前节点的引用清空
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果返回true,需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
// 借助sun.misc.Unsafe#park 执行阻塞
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
// 普通挂起。直到另一个持有锁线程释放锁后,触发下一个线程的 sun.misc.Unsafe#unpark
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
public void unlock() {
sync.release(1);
}
}