本文为何适原创并发编程系列第 17 篇,文末有本系列文章汇总。
通过以下几部分来分析Java提供的读写锁ReentrantReadWriteLock:
本文涉及到上下文联系较多,经常需要上下滑动查看,篇幅太多很不方便,而且文章太长阅读体验也不好,所以分成读写锁(上)和读写锁(下)两篇。本文是上篇,只写到“源码分析读锁的获取与释放”。
在并发编程中解决线程安全的问题,通常使用的都是java提供的关键字synchronized或者重入锁ReentrantLock。它们都是独占式获取锁,也就是在同一时刻只有一个线程能够获取锁。
但是在大多数场景下,大部分时间都是读取共享资源,对共享资源的写操作很少。然而读服务不存在数据竞争问题,如果一个线程在读时禁止其他线程读势必会导致性能降低。
针对这种读多写少的情况,java还提供了另外一个实现Lock接口的ReentrantReadWriteLock(读写锁)。读写锁允许共享资源在同一时刻可以被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞。
直接上代码:
public class ReadWriteLockTest {
public static void main(String[] args) {
final Data data = new Data();
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
data.get();
}
}
}, "读锁线程-" + i).start();
}
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
data.put(new Random().nextInt(10000));
}
}
}, "写锁线程-" + i).start();
}
}
}
class Data {
private Object data = 0;// 共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
public void get() {
rwl.readLock().lock();// 上读锁,其他线程只能读不能写
System.out.println(Thread.currentThread().getName() + " 开始读取数据");
try {
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName() + " 读取数据完成 " + data);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rwl.readLock().unlock(); // 释放读锁
}
}
public void put(Object data) {
rwl.writeLock().lock();// 上写锁,不允许其他线程读也不允许写
System.out.println(Thread.currentThread().getName() + " 开始写数据");
try {
Thread.sleep((long) (Math.random() * 1000));
this.data = data;
System.out.println(Thread.currentThread().getName() + " 写数据完成 " + data);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rwl.writeLock().unlock();// 释放写锁
}
}
}
结果:
读锁线程-0 开始读取数据
读锁线程-1 开始读取数据
读锁线程-2 开始读取数据
读锁线程-0 读取数据完成 0
读锁线程-1 读取数据完成 0
读锁线程-2 读取数据完成 0
写锁线程-0 开始写数据
写锁线程-0 写数据完成 4306
...
写锁线程-1 开始写数据
写锁线程-1 写数据完成 9114
...
写锁线程-2 开始写数据
写锁线程-2 写数据完成 7709
Data类的共享数据data,get()方法上读锁读data,put()方法上写锁写data。启动3个线程读data,3个线程写data。
从结果可以看出,读锁是共享的,读锁的三个线程是同时读取共享数据data的;写锁是互斥的,写锁的三个线程是依次写共享数据data的。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
// 属性
private final ReentrantReadWriteLock.ReadLock readerLock; // 读锁
private final ReentrantReadWriteLock.WriteLock writerLock; // 写锁
final Sync sync; // 锁的主体AQS
// 内部类
abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class FairSync extends Sync {}
static final class NonfairSync extends Sync {}
public static class ReadLock implements Lock, java.io.Serializable {}
public static class WriteLock implements Lock, java.io.Serializable {}
// 构造
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
}
ReentrantReadWriteLock与ReentrantLock一样,其锁主体依然是Sync,读写锁其实就是两个属性:readerLock、writerLock。
一个ReentrantReadWriteLock对象都对应着读锁和写锁两个锁,而这两个锁是通过同一个sync(AQS)实现的。
我们知道AQS.state使用来表示同步状态的。ReentrantLock中,state=0表示没有线程占用锁,state>0时state表示线程的重入次数。但是读写锁ReentrantReadWriteLock内部维护着两个锁,需要用state这一个变量维护多种状态,应该怎么办呢?
读写锁采用“按位切割使用”的方式,将state这个int变量分为高16位和低16位,高16位记录读锁状态,低16位记录写锁状态,并通过位运算来快速获取当前的读写锁状态。
abstract static class Sync extends AbstractQueuedSynchronizer {
// 将state这个int变量分为高16位和低16位,高16位记录读锁状态,低16位记录写锁状态
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/**
* 获取读锁的状态,读锁的获取次数(包括重入)
* c无符号补0右移16位,获得高16位
*/
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/**
* 获取写锁的状态,写锁的重入次数
* c & 0x0000FFFF,将高16位全部抹去,获得低16位
*/
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}
线程获取写锁后,和重入锁一样,将AQS.exclusiveOwnerThread置为当前线程。但是读锁是共享的,可以多个线程同时获取读锁,那么如何记录获取读锁的多个线程以及每个线程的重入情况呢?
sycn中提供了一个HoldCounter类,类似计数器,用于记录一个线程读锁的重入次数。将HoldCounter通过ThreadLocal与线程绑定。
源码如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
// 这个嵌套类的实例用来记录每个线程持有的读锁数量(读锁重入)
static final class HoldCounter {
int count = 0;// 读锁重入次数
final long tid = getThreadId(Thread.currentThread());// 线程 id
}
// ThreadLocal 的子类
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 组合使用上面两个类,用一个 ThreadLocal 来记录当前线程持有的读锁数量
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;// 记录"最后一个获取读锁的线程"的读锁重入次数,用于缓存提高性能
private transient Thread firstReader = null;// 第一个获取读锁的线程(并且其未释放读锁)
private transient int firstReaderHoldCount;// 第一个获取读锁的线程重入的读锁数量
}
注:属性cachedHoldCounter、firstReader、firstReaderHoldCount都是为了提高性能,目前不用太关注。
(ThreadLocal在之后的文章中会专门讲解)
线程与HoldCounter的存储结构如下图:
查看使用示例中代码rwl.readLock().lock()的实现
/**
* rwl.readLock().lock()-->ReadLock.lock()
*/
public void lock() {
sync.acquireShared(1);
}
/**
* ReadLock.lock()-->AQS.acquireShared(int)
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
doAcquireShared()
:doAcquireShared():尝试获取读锁,获取到锁返回1,获取不到返回-1。
首先来分析一下可以获取读锁的条件:
简单总结,只有在其它线程持有写锁时,不能获取读锁,其它情况都可以去获取。
看源码:
/**
* 尝试获取读锁,获取到锁返回1,获取不到返回-1
*/
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
/*
* 根据锁的状态判断可以获取读锁的情况:
* 1. 读锁写锁都没有被占用
* 2. 只有读锁被占用
* 3. 写锁被自己线程占用
* 总结一下,只有在其它线程持有写锁时,不能获取读锁,其它情况都可以去获取。
*/
if (exclusiveCount(c) != 0 && // 写锁被占用
getExclusiveOwnerThread() != current) // 持有写锁的不是当前线程
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() && // 检查AQS队列中的情况,看是当前线程是否可以获取读锁,下文有详细讲解。
r < MAX_COUNT && // 读锁的标志位只有16位,最多之能有2^16-1个线程获取读锁或重入
compareAndSetState(c, c + SHARED_UNIT)) {// 在state的第17位加1,也就是将读锁标志位加1
/*
* 到这里已经获取到读锁了
* 以下是修改记录获取读锁的线程和重入次数,以及缓存firstReader和cachedHoldCounter
*/
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
/*
* 到这里
* 没有获取到读锁,因为上面代码获取到读锁的话已经在上一个if里返回1了
* 锁的状态是满足获取读锁的,因为不满足的上面返回-1了
* 所以没有获取到读锁的原因:AQS队列不满足获取读锁条件,或者CAS失败,或者16位标志位满了
* 像CAS失败这种原因,是一定要再尝试获取的,所以这里再次尝试获取读锁,fullTryAcquireShared()方法下文有详细讲解
*/
return fullTryAcquireShared(current);
}
readerShouldBlock()
:readerShouldBlock():检查AQS队列中的情况,看是当前线程是否可以获取读锁,返回true表示当前不能获取读锁。
分别看下公平锁和非公平锁的实现。
公平锁FairSync:
对于公平锁来说,如果队列中还有线程在等锁,就不允许新来的线程获得锁,必须进入队列排队。
hasQueuedPredecessors()方法在重入锁的文章中分析过,判断同步队列中是否还有等锁的线程,如果有其他线程等锁,返回true当前线程不能获取读锁。
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
非公平锁NonfairSync:
对于非公平锁来说,原本是不需要关心队列中的情况,有机会直接尝试抢锁就好了,这里问什么会限制获取锁呢?
这里给写锁定义了更高的优先级,如果队列中第一个等锁的线程请求的是写锁,那么当前线程就不能跟那个马上就要获取写锁的线程抢,这样做很好的避免了写锁饥饿。
/**
* 队列中第一个等锁的线程请求的是写锁时,返回true,当前线程不能获取读锁
*/
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
// 返回true-队列中第一个等锁的线程请求的是写锁
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() && // head后继节点线程请求写锁
s.thread != null;
}
fullTryAcquireShared()
tryAcquireShared()方法中因为CAS抢锁失败等原因没有获取到读锁的,fullTryAcquireShared()再次尝试获取读锁。此外,fullTryAcquireShared()还处理了读锁重入的情况。
/**
* 再次尝试获取读锁
*/
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {// 注意这里是循环
int c = getState();
if (exclusiveCount(c) != 0) {
// 仍然是先检查锁状态:在其它线程持有写锁时,不能获取读锁,返回-1
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
/*
* exclusiveCount(c) == 0 写锁没有被占用
* readerShouldBlock() == true,AQS同步队列中的线程在等锁,当前线程不能抢读锁
* 既然当前线程不能抢读锁,为什么没有直接返回呢?
* 因为这里还有一种情况是可以获取读锁的,那就是读锁重入。
* 以下代码就是检查如果不是重入的话,return -1,不能继续往下获取锁。
*/
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS修改读锁标志位,修改成功表示获取到读锁;CAS失败,则进入下一次for循环继续CAS抢锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
/*
* 到这里已经获取到读锁了
* 以下是修改记录获取读锁的线程和重入次数,以及缓存firstReader和cachedHoldCounter
*/
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
doAcquireShared()
再回到最开始的acquireShared(),tryAcquireShared()抢锁成功,直接返回,执行同步代码;如果tryAcquireShared()抢锁失败,调用doAcquireShared()。
doAcquireShared()应该比较熟悉了吧,类似AQS那篇中分析过acquireQueued():
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);// 把当前线程构造成节点,Node.SHARED表示共享锁
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {// 前驱节点是head,node才能去抢锁
int r = tryAcquireShared(arg);// 抢锁,上文分析了
if (r >= 0) {// r>0表示抢锁成功
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断node前驱节点状态,将当前线程阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate()
试想一种情况:当线程1持有写锁时,线程2、线程3、线程4、线程5...来获取读锁是获取不到的,只能排进同步队列。当线程1释放写锁时,唤醒线程2来获取锁。因为读锁是共享锁,当线程2获取到读锁时,线程3也应该被唤醒来获取读锁。
setHeadAndPropagate()方法就是在一个线程获取读锁之后,唤醒它之后排队获取读锁的线程的。该方法可以保证线程2获取读锁后,唤醒线程3获取读锁,线程3获取读锁后,唤醒线程4获取读锁,直到遇到后继节点是要获取写锁时才结束。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);// 因为node获取到锁了,所以设置node为head
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())// node后继节点线程要获取读锁,此时node就是head
doReleaseShared();// 唤醒head后继节点(也就是node.next)获取锁
}
}
理解了上文读锁的获取过程,读锁的释放过程不看源码也应该可以分析出来:
使用示例中释放读锁代码 rwl.readLock().unlock()
/**
* rwl.readLock().unlock()-->ReadLock.unlock()
*/
public void unlock() {
sync.releaseShared(1);
}
/**
* sync.releaseShared(1)-->AQS.releaseShared(int)
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {// 当前线程释放读锁,下文介绍
/*
* 到这里,已经没有任何线程占用锁,调用doReleaseShared()唤醒之后获取写锁的线程
* 如果同步队列中还有线程在排队,head后继节点的线程一定是要获取写锁,因为线程持有读锁时会把它之后要获取读锁的线程全部唤醒
*/
doReleaseShared();// 唤醒head后继节点获取锁
return true;
}
return false;
}
/**
* 释放读锁
* 当前线程释放读锁之后,没有线程占用锁,返回true
*/
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 处理firstReader、cachedHoldCounter、readHolds获取读锁线程及读锁重入次数
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;// state第17位-1,也就是读锁状态标志位-1
if (compareAndSetState(c, nextc))// CAS设置state,CAS失败自旋进入下一次for循环
return nextc == 0;// state=0表示没有线程占用锁,返回true
}
}
大多数业务场景,都是读多写少的,采用互斥锁性能较差,所以提供了读写锁。读写锁允许共享资源在同一时刻可以被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞。
一个ReentrantReadWriteLock对象都对应着读锁和写锁两个锁,而这两个锁是通过同一个sync(AQS)实现的。
读写锁采用“按位切割使用”的方式,将state这个int变量分为高16位和低16位,高16位记录读锁状态,低16位记录写锁状态。
读锁获取时,需要判断当时的写锁没有被其他线程占用即可,锁处于的其他状态都可以获取读锁。