与 synchronized 一样,都支持可重入,但相对于 synchronized 它还具备如下特点
// 获取锁,需成对出现,释放锁放在finally
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁。
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
加锁时线程t1调用reentrantLock.lockInterruptibly()
方法表示自己申请的是可打断锁,如果其他线程拥有了这把锁,为了防止线程1无限等待下去,可以在其他线程中调用t1.interrupt()
打断t1线程的等待状态,让线程t1抛出InterruptedException
异常,退出等待状态。
立即失败
某线程调用lock.tryLock()
尝试获取锁,如果没有获取成功,则放弃获取,如果获取了那么就往下执行。
超时失败
某线程调用lock.tryLock(1, TimeUnit.SECONDS)
,在1s的时间内如果能够获取到锁就往下执行,如果没有就放弃获取。
ReentrantLock 默认是不公平的,意思就是当一个线程释放锁之后,处于阻塞状态的线程并不是获取锁的先后顺序来获得锁的。
创建对象时可以使用带参构造器new ReentrantLock(false)
实现公平锁,公平锁一般没有必要,会降低并发度。
synchronized 中也有一个条件变量,Monitor中的waitSet,当条件不满足时进入 waitSet 等待。
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的
使用要点:
例子
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
// 没有烟,等待
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}
private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
// 唤醒
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();}
}
默认为非公平锁实现
public ReentrantLock(){
sync = new NonfairSync();
}
1 2 3
NonfairSync继承自AQS
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
没有竞争时
第一个竞争者出现时
Thread-1执行了
当前线程会进入acquireQueue逻辑
再次有多个线程经历上述过程竞争失败,变成这个样子
Thread-0释放锁,进入tryRelease流程,如果成功
如果加锁成功(没有竞争),会设置
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
如果不巧又被 Thread-4 占了先
大致流程
加锁
解锁
如果此时其他线程打断正在阻塞的线程,那么该线程会抛出异常,从而退出死循环等待获得锁。
static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果没有获得到锁, 进入 (一)
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// (一) 可打断的获取锁流程
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 处于阻塞队列中的线程都运行到这个死循环,如果前驱结点是head,那么该线程
// 在锁被释放的时候有能力去竞争锁,这里是可打断的,如果某个线程被打断,就能够
// 抛出异常,从而退出死循环
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// 在 park 过程中如果被 interrupt 会进入此
// 这时候抛出异常, 而不会再次进入 for (;;)
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
每个条件变量其实就对应着一个等待队列,类似与Monitor中的waitSet,但ReentrantLock支持多个条件变量,其实现类是ConditionObject。
开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程
创建新的Node状态为-2(Node.CONDITION),关联Thread-0,加入等待队列尾部
接下来进入 AQS 的 fullyRelease(将state置0) 流程,释放同步器上的锁
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
park 阻塞 Thread-0
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
假设 Thread-1 要来唤醒 Thread-0
进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1
每个对象都可以用继承自Object的wait/notify方法来实现等待/通知机制。而Condition接口也提供了类似Object监视器的方法,通过与Lock配合来实现等待/通知模式。两者的区别如下:
对比项 | Object监视器 | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用Lock.lock获取锁,调用Lock.newCondition获取Condition对象 |
调用方式 | 直接调用,比如object.notify() | 直接调用,比如condition.await() |
等待队列的个数 | 一个 | 多个 |
当前线程释放锁进入等待状态 | 支持 | 支持 |
当前线程释放锁进入等待状态,在等待状态中不中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态直到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |