jdk提供
synchronized
实现线程同步,但有些场景下并不灵活,如多个同步方法,每次只能有一个线程访问;而Lock则可以非常灵活的在代码中实现同步机制
在之前学习阻塞队列中,较多地方使用
ReadWriteLock
,Condition
,接下来在探究实现原理之前,先研究下锁的使用
Lock 接口的定义
public interface Lock {
// 获取锁,若当前lock被其他线程获取;则此线程阻塞等待lock被释放
// 如果采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁
void lock();
// 获取锁,若当前锁不可用(被其他线程获取);
// 则阻塞线程,等待获取锁,则这个线程能够响应中断,即中断线程的等待状态
void lockInterruptibly() throws InterruptedException;
// 来尝试获取锁,如果获取成功,则返回true;
// 如果获取失败(即锁已被其他线程获取),则返回false
// 也就是说,这个方法无论如何都会立即返回
boolean tryLock();
// 在拿不到锁时会等待一定的时间
// 等待过程中,可以被中断
// 超过时间,依然获取不到,则返回false;否则返回true
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 返回一个绑定该lock的Condtion对象
// 在Condition#await()之前,锁会被该线程持有
// Condition#await() 会自动释放锁,在wait返回之后,会自动获取锁
Condition newCondition();
}
ReentrantLock
可重入锁。jdk中ReentrantLock是唯一实现了Lock接口的类 可重入的意思是一个线程拥有锁之后,可以再次获取锁,
最基本的使用场景,就是利用lock和unlock来实现线程同步
以轮班为实例进行说明,要求一个人下班之后,另一个人才能上班,即不能两个人同时上班,具体实现可以如下
public class LockDemo {
private Lock lock = new ReentrantLock();
private void workOn() {
System.out.println(Thread.currentThread().getName() + ":上班!");
}
private void workOff() {
System.out.println(Thread.currentThread().getName() + ":下班");
}
public void work() {
try {
lock.lock();
workOn();
System.out.println(Thread.currentThread().getName()
+ "工作中!!!!");
Thread.sleep(100);
workOff();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
LockDemo lockDemo = new LockDemo();
int i = 0;
List<Thread> list = new ArrayList<>(30);
do {
Thread a = new Thread(new Runnable() {
@Override
public void run() {
lockDemo.work();
}
}, "小A_" + i);
Thread b = new Thread(new Runnable() {
@Override
public void run() {
lockDemo.work();
}
}, "小B_" + i);
list.add(a);
list.add(b);
} while (i++ < 10);
list.parallelStream().forEach(Thread::start);
Thread.sleep(3000);
System.out.println("main over!");
}
}
上面的示例,主要给出了lock()
和 unlock()
的配套使用,当一个线程在上班迁尝试获取锁,如果获取到,则只有在下班之后才会释放锁,保证在其上班的过程中,不会有线程也跑来上岗抢饭碗,输出如下
小A_3:上班!
小A_3工作中!!!!
小A_3:下班
小A_1:上班!
小A_1工作中!!!!
小A_1:下班
// .... 省略部分
小B_7:上班!
小B_7工作中!!!!
小B_7:下班
小B_5:上班!
小B_5工作中!!!!
小B_5:下班
main over!
从基本的使用中,确定lock的使用姿势一般如下:
Lock lock = new ReentrantLock()
lock.lock()
尝试获取锁,若被其他线程占用,则阻塞lock.unlock()
; 一般来讲,把释放锁的逻辑,放在需要线程同步的代码包装外的finally
块中即常见的使用姿势应该是
try {
lock.lock();
// .....
} finally {
lock.unlock();
}
一个疑问,若没被加锁,仅只执行lock.unlock()
是否会有问题?
测试如下
@Test
public void testLock() {
Lock lock = new ReentrantLock();
// lock.lock();
// lock.unlock();
lock.unlock();
System.out.println("123");
}
执行之后,发现抛了异常(去掉上面的注释,即加锁一次,释放锁两次也会抛下面异常)
另一个疑问,代码块中多次上锁,释放锁只一次,是否会有问题?
将前面的TestDemo方法稍稍改动一下
public void work() {
try {
lock.lock();
workOn();
lock.lock();
System.out.println(Thread.currentThread().getName()
+ "工作中!!!!");
Thread.sleep(100);
workOff();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
再次执行,发现其他线程都无法再次获取到锁了,运行的gif图如下
因此可以得出结论:
lock()
,lock()
连续调用的情况,即两者之间没有释放锁unlock()
的显示调用在JDK的阻塞队列中,很多地方就利用了Condition和Lock来实现出队入队的并发安全性,以 ArrayBlockingQueue
为例
内部定义了锁,非空条件,非满条件
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
// ... 省略
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
出队,入队的实现如下(屏蔽一些与锁无关逻辑)
// 入队逻辑
public void put(E e) throws InterruptedException {
// ...
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// 如果队列已满,则执行Condtion notFull的等待方法
// 本线程会释放锁,等待其他线程出队之后,执行 notFull.singal()方法
notFull.await();
enqueue(e);
} finally {
// 释放锁
lock.unlock();
}
}
private void enqueue(E x) {
// 入队,notEmpty 条件执行,唤醒被 notEmpty.await() 阻塞的出队线程
notEmpty.signal();
}
// 出队
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 队列为空,线程执行notEmpty.wait(),阻塞并释放锁
// 等待其他入队线程执行 notEmpty.signal(); 后被唤醒
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// ...
// 出队,notFull 条件执行,唤醒被 notFull.await() 阻塞的入队线程
notFull.signal();
}
下面看下Condition的定义
public interface Condition {
// 使当前线程处于等待状态,释放与Condtion绑定的lock锁
// 直到 singal()方法被调用后,被唤醒(若中断,就game over了)
// 唤醒后,该线程会再次获取与条件绑定的 lock锁
void await() throws InterruptedException;
// 相比较await()而言,不响应中断
void awaitUninterruptibly();
// 在wait()的返回条件基础上增加了超时响应,返回值表示当前剩余的时间
// < 0 ,则表示超时
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 同上,只是时间参数不同而已
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 同上,只是时间参数不同而已
boolean awaitUntil(Date deadline) throws InterruptedException;
// 表示条件达成,唤醒一个被条件阻塞的线程
void signal();
// 唤醒所有被条件阻塞的线程。
void signalAll();
}
通过上面的注释,也就是说Condtion一般是与Lock配套使用,应用在多线程协同工作的场景中;即一个线程的执行,期望另一个线程执行完毕之后才完成
针对这种方式,我们写个测试类,来实现累加,要求如下:
上面这种情况下,线程3的执行,要求线程1和线程2都执行完毕
说明:下面实现只是为了演示Condition和Lock的使用,上面这种场景有更好的选择,如Thread.join()或者利用Fork/Join都更加优雅
public class LockCountDemo {
private int start = 10;
private int middle = 90;
private int end = 200;
private volatile int tmpAns1 = 0;
private volatile int tmpAns2 = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private AtomicInteger count = new AtomicInteger(0);
private int add(int i, int j) {
try {
lock.lock();
int sum = 0;
for (int tmp = i; tmp < j; tmp++) {
sum += tmp;
}
return sum;
} finally {
atomic();
lock.unlock();
}
}
private int sum() throws InterruptedException {
try {
lock.lock();
condition.await();
return tmpAns1 + tmpAns2;
} finally {
lock.unlock();
}
}
private void atomic() {
if (2 == count.addAndGet(1)) {
condition.signal();
}
}
public static void main(String[] args) throws InterruptedException {
LockCountDemo demo = new LockCountDemo();
Thread thread1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() +
" : 开始执行");
demo.tmpAns1 = demo.add(demo.start, demo.middle);
System.out.println(Thread.currentThread().getName() +
" : calculate ans: " + demo.tmpAns1);
}, "count1");
Thread thread2 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() +
" : 开始执行");
demo.tmpAns2 = demo.add(demo.middle, demo.end + 1);
System.out.println(Thread.currentThread().getName() +
" : calculate ans: " + demo.tmpAns2);
}, "count2");
Thread thread3 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() +
" : 开始执行");
int ans = demo.sum();
System.out.println("the total result: " + ans);
} catch (Exception e) {
e.printStackTrace();
}
}, "sum");
thread3.start();
thread1.start();
thread2.start();
Thread.sleep(3000);
System.out.println("over");
}
}
输出如下
sum : 开始执行
count2 : 开始执行
count1 : 开始执行
count1 : calculate ans: 3960
the total result: 20055
count2 : calculate ans: 16095
over
小结Condition的使用:
Lock#newConditin()
进行实例化Condition#await()
会释放lock,线程阻塞;直到线程中断or Condition#singal()
被执行,唤醒阻塞线程,并重新获取lockAQS是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题
AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
//取值为 CANCELLED, SIGNAL, CONDITION, PROPAGATE 之一
volatile int waitStatus;
volatile Node prev;
volatile Node next;
// Link to next node waiting on condition,
// or the special value SHARED
volatile Thread thread;
Node nextWaiter;
}
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
源码分析:
tyrAcquire: 尝试获取锁,非阻塞立即返回
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 没有线程占用锁
if (compareAndSetState(0, acquires)) {
// 占用锁成功,设置为独占
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 本线程之前已经获取到锁
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 更新重入次数
setState(nextc);
return true;
}
return false;
}
非公平锁tryAcquire的流程是:
addWaiter
: 入队
private Node addWaiter(Node mode) {
// 创建一个节点,并将其挂在链表的最后
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点
enq(node);
return node;
}
acquireQueued
挂起
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// /标记线程是否被中断过
boolean interrupted = false;
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
// 若该节点为有效的队列头(head指向的Node内部实际为空)
// 尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node); // 获取成功,将当前节点设置为head节点
p.next = null; // help GC
failed = false;
//返回是否被中断过
return interrupted;
}
// 判断获取失败后是否可以挂起,若可以则挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 线程若被中断,设置interrupted为true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
线程挂起的逻辑
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //前驱节点的状态
if (ws == Node.SIGNAL) // 前驱节点状态为signal,返回true
return true;
if (ws > 0) {
// 从队尾向前寻找第一个状态不为CANCELLED的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前驱节点的状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
线程入队后能够挂起的前提是,它的前驱节点的状态为SIGNAL,它的含义是“Hi,前面的兄弟,如果你获取锁并且出队后,记得把我唤醒!”。
shouldParkAfterFailedAcquire
会先判断当前节点的前驱状态是否符合要求:
小结下lock()流程
acquire()
acquireQueued
, 挂起之前,会先尝试获取锁,值有确认失败之后,则挂起锁,并设置前置Node的状态为SIGNAL(以保障在释放锁的时候,可以保证唤醒Node的后驱节点线程)尝试释放锁,成功,需要清楚各种状态(计数,释放独占锁)
此外还需要额外判断队列下个节点是否需要唤醒,然后决定唤醒被挂起的线程;
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
// 查看头结点的状态是否为SIGNAL,如果是则唤醒头结点的下个节点关联的线程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 计算释放后state值
if (Thread.currentThread() != getExclusiveOwnerThread())
// 如果不是当前线程占用锁,那么抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 锁被重入次数为0,表示释放成功,清空独占线程
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
Lock lock = new ReentrantLock()
lock.lock()
尝试获取锁,若被其他线程占用,则阻塞lock.unlock()
; 一般来讲,把释放锁的逻辑,放在需要线程同步的代码包装外的finally
块中lock()
,lock()
连续调用的情况,即两者之间没有释放锁unlock()
的显示调用Lock#newConditin()
进行实例化Condition#await()
会释放lock,线程阻塞;直到线程中断or Condition#singal()
被执行,唤醒阻塞线程,并重新获取lockReentrantLock#lock
的流程图大致如下