private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public static class ReadLock public static class WriteLock
内部的Sync中有如下定义。
c指的AbstractQueuedSynchronizer的state。SHARED_UNIT为65536。
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
读锁调用的AbstractQueuedSynchronizer中acquireShared,方法的实现在读锁内部,方法内部一段代码注释帮助大家理解
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
我们将焦点聚集在tryAcquireShared上。
第一次加锁时state为0,exclusiveCount和sharedCount(c)也是0,经过compareAndSetState(c, c + SHARED_UNIT),c变为65536
如果相同线程再次加读锁,则c的值再次加65536,firstReaderHoldCount变为2
假设此刻c为65536,不同线程尝试加读锁时,sharedCount(c)计算如下为例,将65536表示为32位二进制
00000000000000010000000000000000
无符号右移16位
00000000000000000000000000000001
对应int值1,表示当前读锁加锁数量。此时代码进入
else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; }
创建HoldCounter,并将count设置为1,线程号为当前线程的Id。HoldCounter可以理解为某线程加锁的次数。
如果线程因为并发原因导致无法进入if块,比如在CAS c时失败,就进入fullTryAcquireShared,代码和上述大同小异,主要是进入 for (;;){}循环尝试不停加锁。
如果因为别的线程加了写锁,则因为如下代码,加锁失败。
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
纵观try的过程,两个返回值1代表加锁成功,-1代表加锁失败。加锁失败时就会执行doAcquireShared,将当前线程作为一个节点加入双向链表。
方法的具体代码和之前的很相似。区别主要有两点:1.Node的类型为SHARED,2.setHeadAndPropagate替换了setHead,重点在于此。
int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
满足一系列的判断会调用doReleaseShared。原因是现在加的共享锁,所以需要唤醒后继需要加共享锁的节点去加锁。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
doReleaseShared做的事情就是如果当前head节点是SIGNAL,就唤醒后继节点,否则将状态设为PROPAGATE,如果head节点变化了,就结束。
首先如果所有的加锁都是读锁,是不会有一个链表的,必然要么是有线程加了读锁后又有线程尝试加写锁,或者相反。
如下图所示,N0为head,类型为共享锁,所以首先唤醒N0,unpark后循环继续,在判断h==head前,类型为独占的N1加入进来了,但还未更改状态,_所以N1的状态随时有可能变化,从0变成了SIGNAL
或者还没变化,但如果节点一直处于链表中没有唤醒,它的状态会变为SIGNAL_。
unpark前一瞬间的列表
+------+ +------+ | N0(S)| <---- | N1(E)| |SIGNAL| ----> |DEFAULT| +------+ +------+
unpark后一瞬间的列表,此后循环会尝试将DEFAULT值改为PROPAGATE
+------+ | N1(E) | |DEFAULT| +------+
此后又有类型为共享的N2加入进来,此刻N1的值可能为PROPAGATE或者SIGNAL
+---------+ +------+ | N1(E) | <---- | N2(S)| |PROPAGATE| ----> |DEFAULT| +---------+ +------+
独占锁在释放时判断只要状态不为0,就可以唤醒后继者,所以N1的状态为PROPAGATE也没有关系。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
直接看下已经有线程对加锁的情况下获取写锁的流程。
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
如果已经有线程持有读/写锁, c为65536或者其倍数,假设为65536,那么经过exclusiveCount后,即65536&65535,结果为0。
此时getExclusiveOwnerThread()返回null/持有写锁的线程实例,所以加锁失败,那么剩下逻辑就是上篇对AbstractQueuedSynchronizer acquire方法的分析。
如果是同一线程写锁重入,则进行运算1&65535 结果为1,加上已经重入的次数只要不超过最大值就对c+1
if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires);
如果加写锁失败,那么流程将进入AbstractQueuedSynchronizer acquireQueued中。相关内容及释放过程已经在前文中描述过了。
原创声明,本文系作者授权云+社区发表,未经许可,不得转载。
如有侵权,请联系 yunjia_community@tencent.com 删除。
我来说两句