//无参构造方法调用有参构造方法,并且传入一个参数为false
public ReentrantReadWriteLock() {
this(false);//----->跟进去
}
//fair为false,表示创建一个非公平的AQS
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
//创建一个ReadLock,并且把this(ReentrantReadWriteLock)当做参数传入
readerLock = new ReadLock(this);//----->跟进去
writerLock = new WriteLock(this);
}
//参数lock为上面创建的ReentrantReadWriteLock
protected ReadLock(ReentrantReadWriteLock lock) {
//此时把ReentrantReadWriteLock里的属性sync引用赋值给ReadLock里的属性sync
sync = lock.sync;
}
到这其实构造方法就已经看完了,此处想表达的一点是:(ReentrantReadWriteLock里的属性sync)、(reentrantReadWriteLock.readLock的属性sync)和(reentrantReadWriteLock.writeLock的属性sync)是同一个sync(extends AbstractQueuedSynchronizer)
A获取读锁后,A可以获取读锁
A获取读锁后,A不可以获取写锁
A获取写锁后,A可以获取读锁
A获取写锁后,A可以获取写锁
A获取读锁后,B可以获取读锁
A获取读锁后,B不可以获取写锁
A获取写锁后,B不可以获取读锁
A获取写锁后,B不可以获取写锁
综上:在不考虑重入的情况下,如果A获得读锁,那么其他线程可以获得读锁,但是不能获取写锁(阻塞);如果A获得写锁,那么其他线程读写锁都获取不到(阻塞)
AQS 的状态state是32位(int 类型)的,辦成两份,读锁用高16位,表示持有读锁的线程数(sharedCount),写锁低16位,表示写锁的重入次数 (exclusiveCount)。状态值为 0 表示锁空闲,sharedCount不为 0 表示分配了读锁,exclusiveCount 不为 0 表示分配了写锁,sharedCount和exclusiveCount 一般不会同时不为 0,只有当线程占用了写锁,该线程可以重入获取读锁,反之不成立。
//高低16位,这个就是一常量
static final int SHARED_SHIFT = 16;
//上读锁需要在高16位添加1,也就是需要添加一个17位(1后面16个0)的数
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//读锁或者写锁运行最大的上锁数,以写锁为例,最大是16位,那么写锁的最大值为就是15个1
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//state变量 & 这个值就能获得低16的数值
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
举一个例子
初始化state = 0,其中1-16号代码写锁,17-18号代表读锁
在state=0的基础之上加一个写锁,其实就是state+1
在state=0的基础之上加一个读锁,其实就是state+65536(二进制是1后面16个0)
获取读锁
把state向右移16位,如下图所示,就能获得读锁的数量,
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
获取写锁 ,state & EXCLUSIVE_MASK 就能获取第16位的值
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
参考:【Java并发编程】ReentrantReadWriteLock源码及实现原理分析_fxkcsdn的博客-CSDN博客_reentrantreadwritelock原理
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
//在没有读锁和写锁的情况下申请读锁
reentrantReadWriteLock.readLock().lock();//----->跟lock方法
//释放读锁
reentrantReadWriteLock.readLock().unlock();
public void lock() {
sync.acquireShared(1);//----->跟进去,参数为1
}
public final void acquireShared(int arg) {//arg=1
if (tryAcquireShared(arg) < 0)//----->跟进去,arg=1
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
//获取当前线程
Thread current = Thread.currentThread();
//获取AQS里的属性state,初始化为0
int c = getState();
//exclusiveCount(c)就是根据C获取写锁的数量,因为我们的前提是在锁没有读锁和写锁的情况下申请读锁,所以exclusiveCount(c)==0
//sharedCount(c)就是根据C获取读锁的数量,因为我们的前提是在锁没有读锁和写锁的情况下申请读锁,所以sharedCount(c)==0
exclusiveCount(c) != 0
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)getExclusiveOwnerThread()记录上读锁的线程,如果没有读锁,返回null
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&//readerShouldBlock()此方法不讲
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//用CAS方式给state的高16位加1
if (r == 0) {
//略。。。
} else if (firstReader == current) {
//略。。。
} else {
//略。。。
}
return 1;//当前情况下最终的返回结果
}
return fullTryAcquireShared(current);
}
public final void acquireShared(int arg) {//arg=1
if (tryAcquireShared(arg) < 0)//----->跳出来,此时tryAcquireShared(1)=1,所以不执行doAcquireShared方法
doAcquireShared(arg);
}
此时上读锁成功
多线程断点调试
new Thread(() -> {
reentrantReadWriteLock.readLock().lock();
try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
System.out.println("t1 logic ");
reentrantReadWriteLock.readLock().unlock();
}, "t1").start();
new Thread(() -> {
reentrantReadWriteLock.readLock().lock();
try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
System.out.println("t2 logic ");
reentrantReadWriteLock.readLock().unlock();
}, "t2").start();
public void lock() {
sync.acquireShared(1);//----->跟进去,参数为1
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//----->跟进去,arg = 1
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
//获取当前线程
Thread current = Thread.currentThread();
//我们的假设的是原来有一把读锁,那么c=state=65536
int c = getState();
//exclusiveCount(c)就是根据C获取写锁的数量,因为我们的前提是在锁没有读锁和写锁的情况下申请读锁,所以exclusiveCount(c)==0
//sharedCount(c)就是根据C获取读锁的数量,因为我们的前提是在锁没有读锁和写锁的情况下申请读锁,所以sharedCount(c)==0
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&//readerShouldBlock()此方法不讲
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//用CAS方式给state的高16位加1
if (r == 0) {
//略。。。
} else if (firstReader == current) {
//略。。。
} else {
//略。。。
}
return 1;//最后返回的结果
}
return fullTryAcquireShared(current);
}
public final void acquireShared(int arg) {//arg=1
if (tryAcquireShared(arg) < 0)//----->跳出来,此时tryAcquireShared(1)=1,所以不执行doAcquireShared方法
doAcquireShared(arg);
}
此时上读锁成功
多线程断点调试
new Thread(() -> {
reentrantReadWriteLock.writeLock().lock();
try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
System.out.println("t1 logic ");
reentrantReadWriteLock.writeLock().unlock();
}, "t1").start();
new Thread(() -> {
reentrantReadWriteLock.readLock().lock();//----->跟进去,lock方法
try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
System.out.println("t2 logic ");
reentrantReadWriteLock.readLock().unlock();
}, "t2").start();
public void lock() {
sync.acquireShared(1);----->跟进去,参数为1
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)----->跟进去,arg=1
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
//获得当前线程
Thread current = Thread.currentThread();
//获得state的值
int c = getState();
if (exclusiveCount(c) != 0 //假设条件为有读锁,所以exclusiveCount(c) != 0 为真
&&
getExclusiveOwnerThread() != current) //AQS会记录申请成功的读锁的线程,如果申请写锁成功的线程再次申请读锁(可重入),getExclusiveOwnerThread() != current 为假, 否则为真,此处为真。
return -1;//最后的返回结果
//略
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//----->跳出来,tryAcquireShared(arg)=-1
doAcquireShared(arg);//----->跟进去,arg=1
}
//把当前线程封装到node节点中,并且插入到AQS的链表尾,再一次尝试获取锁,如果获取成功,把当前节点设置为头节点;否则就告诉当线程挂起。
private void doAcquireShared(int arg) {
//给AQS里的链表添加节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获得当前节点的前驱节点
final Node p = node.predecessor();
if (p == head) {//如果是头节点
//尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {//如果获取成功
setHeadAndPropagate(node, r);//设置当前节点为头节点,并且唤醒后继的读节点
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;//方法的唯一出口,但是你要明白中间是当前线程是有挂起的
}
}
if (shouldParkAfterFailedAcquire(p, node)//设置node的prev节点的一个参数,告诉prev需要唤醒node
&&
parkAndCheckInterrupt())//调用LockSupport.park(thred)挂起线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
现在为止,其实就该线程就已经挂起了(并且把当前线程存到node节点中),因为前面有写锁,所以不能获取锁。等写锁释放后就会调用LockSupport.unpark()继续运行上面代码的线程。
reentrantReadWriteLock.readLock().lock();
try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } finally { }
System.out.println("t1 logic ");
reentrantReadWriteLock.readLock().unlock();----->跟进去.unlock()方法
public void unlock() {
sync.releaseShared(1);//----->跟进去,参数为1
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//----->跟进去,参数为1
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
//获取当前线程
Thread current = Thread.currentThread();
if (firstReader == current) {
//略。。。
} else {
//略。。。
}
for (;;) {
//获得state的数据
int c = getState();
//释放读锁就是在高16位减1,即c-65536
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))//CAS方式设置state的新值
//nextc即state的值,state==0,表明无读锁和无写锁
return nextc == 0;
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//----->跳出来,如果state==0,执行下面方法运行其它线程加锁,否则就不执行
doReleaseShared();//如果state==0,唤醒AQS里链表里的后继节点
return true;
}
return false;
}
此时读锁释放锁成功,其实逻辑很简单,用CAS算法给state的高16位减1,如果state==0,唤醒后继节点,否则什么也不做。
此处就像上面的写的那样了,就简单介绍几个方法
public final void acquire(int arg) {//arg=1
if (!tryAcquire(arg) //尝试获取锁,如果获取成功,返回true;!tryAcquire(arg)返回false
&&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//只有当tryAcquire(arg)返回false,j即申请锁失败才执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
//获取当前线程
Thread current = Thread.currentThread();
//获得state的值
int c = getState();
//获取写锁的数量
int w = exclusiveCount(c);
if (c != 0) {//说明有读锁或者写锁,此时不能申请写锁
// 如果写锁为0(说明有读锁)||写锁不为0(说明有写锁),getExclusiveOwnerThread()返回null或者申请写锁成功的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 走到这说明写锁重入
setState(c + acquires);
return true;
}
//如果c==0,说明既没有写锁也没有读锁
if (writerShouldBlock() //返回false,写死的
||
!compareAndSetState(c, c + acquires))//用CAS设置state的状态
return false;
setExclusiveOwnerThread(current);//设置申请成功写锁的线程
return true;
}
private Node addWaiter(Node mode) {
//把当前线程封装到node节点中
Node node = new Node(Thread.currentThread(), mode);
//并且把node设置为尾结点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);//走到这说明tail为空,说明链表为空,需要初始化链表并且插入当前node节点
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//第一遍循环走if,把head设置为new Node())
if (compareAndSetHead(new Node()))
tail = head;
} else {
//第二遍循环走else,node设置为尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
此处就像上面的写的那样了,就简单介绍几个方法
public final boolean release(int arg) {//arg=1
if (tryRelease(arg)) {//释放锁,如果当前写锁释放成功,并且没有重入写锁,说明state的高16位为0,可以唤醒后继节点
Node h = head;
if (h != null && h.waitStatus != 0)//如果符合条件
unparkSuccessor(h);//唤醒h的里后继节点
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())//如果记录的申请写锁的线程和释放写锁的线程不一致,抛异常
throw new IllegalMonitorStateException();
//释放写锁就是减一
int nextc = getState() - releases;
//如果此时写锁为0,free返回true,否则返回false
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);//没有写锁了就把申请写锁成功的线程设置为null
//更新state的数值
setState(nextc);
return free;
}
0)本文写的真烂(自评)
1)核心类AbstractQueuedSynchronizer (AQS)
2)高低16位的真正含义
3)看源码要跟源码,而不是傻傻的看
4)向这种要多线程运行看,读读申请写,读申请写,写申请读,写申请写等等
5)可重入锁要考虑
6)理解AQS里的链表节点的添加删除是一个核心中的核心
7)释放写锁成功后会唤醒后继节点;申请读锁成功后会唤醒后继节点(如果后继节点是读线程)