前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ReentranReadWriteLock源码浅析

ReentranReadWriteLock源码浅析

原创
作者头像
小金狗子
修改2020-03-23 14:39:22
6610
修改2020-03-23 14:39:22
举报

本文为作者个人理解难免有误,欢迎指正

请先阅读ReentranLock源码浅析

ReentranReadWriteLock 的基本构成

代码语言:txt
复制
  private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    final Sync sync;
    public static class ReadLock
    public static class WriteLock

ReentranReadWriteLock的一个要点

内部的Sync中有如下定义。

c指的AbstractQueuedSynchronizer的state。SHARED_UNIT为65536。

代码语言:txt
复制
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

ReadLock加锁

读锁调用的AbstractQueuedSynchronizer中acquireShared,方法的实现在读锁内部,方法内部一段代码注释帮助大家理解

代码语言:txt
复制
  public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

  protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

我们将焦点聚集在tryAcquireShared上。

第一次加锁时state为0,exclusiveCount和sharedCount(c)也是0,经过compareAndSetState(c, c + SHARED_UNIT),c变为65536

如果相同线程再次加读锁,则c的值再次加65536,firstReaderHoldCount变为2

假设此刻c为65536,不同线程尝试加读锁时,sharedCount(c)计算如下为例,将65536表示为32位二进制

00000000000000010000000000000000

无符号右移16位

00000000000000000000000000000001

对应int值1,表示当前读锁加锁数量。此时代码进入

代码语言:txt
复制
    else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }

创建HoldCounter,并将count设置为1,线程号为当前线程的Id。HoldCounter可以理解为某线程加锁的次数。

如果线程因为并发原因导致无法进入if块,比如在CAS c时失败,就进入fullTryAcquireShared,代码和上述大同小异,主要是进入 for (;;){}循环尝试不停加锁。

如果因为别的线程加了写锁,则因为如下代码,加锁失败。

代码语言:txt
复制
if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)

纵观try的过程,两个返回值1代表加锁成功,-1代表加锁失败。加锁失败时就会执行doAcquireShared,将当前线程作为一个节点加入双向链表。

方法的具体代码和之前的很相似。区别主要有两点:1.Node的类型为SHARED,2.setHeadAndPropagate替换了setHead,重点在于此。

代码语言:txt
复制
int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);


private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

满足一系列的判断会调用doReleaseShared。原因是现在加的共享锁,所以需要唤醒后继需要加共享锁的节点去加锁。

代码语言:txt
复制
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

doReleaseShared做的事情就是如果当前head节点是SIGNAL,就唤醒后继节点,否则将状态设为PROPAGATE,如果head节点变化了,就结束。

首先如果所有的加锁都是读锁,是不会有一个链表的,必然要么是有线程加了读锁后又有线程尝试加写锁,或者相反。

如下图所示,N0为head,类型为共享锁,所以首先唤醒N0,unpark后循环继续,在判断h==head前,类型为独占的N1加入进来了,但还未更改状态,_所以N1的状态随时有可能变化,从0变成了SIGNAL

或者还没变化,但如果节点一直处于链表中没有唤醒,它的状态会变为SIGNAL_。

unpark前一瞬间的列表

代码语言:txt
复制
        +------+        +------+ 
        | N0(S)| <----  | N1(E)| 
        |SIGNAL| ---->  |DEFAULT|
        +------+        +------+ 

unpark后一瞬间的列表,此后循环会尝试将DEFAULT值改为PROPAGATE

代码语言:txt
复制
 +------+ 
 | N1(E) | 
 |DEFAULT|
 +------+ 

此后又有类型为共享的N2加入进来,此刻N1的值可能为PROPAGATE或者SIGNAL

代码语言:txt
复制
        +---------+        +------+ 
        |  N1(E)  | <----  |  N2(S)| 
        |PROPAGATE| ---->  |DEFAULT|
        +---------+        +------+ 

独占锁在释放时判断只要状态不为0,就可以唤醒后继者,所以N1的状态为PROPAGATE也没有关系。

代码语言:txt
复制
 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

WriteLock加锁

直接看下已经有线程对加锁的情况下获取写锁的流程。

代码语言:txt
复制
 if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
代码语言:txt
复制
protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

如果已经有线程持有读/写锁, c为65536或者其倍数,假设为65536,那么经过exclusiveCount后,即65536&65535,结果为0。

此时getExclusiveOwnerThread()返回null/持有写锁的线程实例,所以加锁失败,那么剩下逻辑就是上篇对AbstractQueuedSynchronizer acquire方法的分析。

如果是同一线程写锁重入,则进行运算1&65535 结果为1,加上已经重入的次数只要不超过最大值就对c+1

代码语言:txt
复制
if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);

如果加写锁失败,那么流程将进入AbstractQueuedSynchronizer acquireQueued中。相关内容及释放过程已经在前文中描述过了。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 本文为作者个人理解难免有误,欢迎指正
  • 请先阅读ReentranLock源码浅析
  • ReentranReadWriteLock 的基本构成
  • ReentranReadWriteLock的一个要点
  • ReadLock加锁
  • WriteLock加锁
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档