前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS源码分析之ReentrantReadWriteLock

AQS源码分析之ReentrantReadWriteLock

作者头像
山行AI
发布2020-03-25 18:07:35
5210
发布2020-03-25 18:07:35
举报
文章被收录于专栏:山行AI山行AI

读写锁:java.util.concurrent.locks.ReentrantReadWriteLock,它可以解决有线程在写但是其他线程还能读取的问题。先了解一下读写状态的设计. 我们知道 AQS中有一个状态值, 比如在 ReentrantLock中表示持有锁的线程重入了多少次. 但是在 ReentrantReadWriteLock中有读锁和写锁因此需要划分,所以高 16位代表读锁的状态,低 16位代表写锁的状态.

结构图

锁的属性

代码语言:javascript
复制
 /** Inner class providing readlock */    private final ReentrantReadWriteLock.ReadLock readerLock;    /** Inner class providing writelock */    private final ReentrantReadWriteLock.WriteLock writerLock;    /** Performs all synchronization mechanics */    final Sync sync;
  • readerLock: 读锁
  • writerLock: 写锁
  • sync: 同步机制

构造方法

代码语言:javascript
复制
    public ReentrantReadWriteLock() {        // 默认是非公平锁        this(false);    }
    public ReentrantReadWriteLock(boolean fair) {        // 根据是否公平性,对sync赋不同的值        sync = fair ? new FairSync() : new NonfairSync();        // 读锁        readerLock = new ReadLock(this);        // 写锁        writerLock = new WriteLock(this);    }
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

实例化的时候会根据fair值的不同来创建不同的sync,代表着同步的公平性与非公平性。同时实例化读锁和写锁。

FairSync与NonfairSync

java.util.concurrent.locks.ReentrantReadWriteLock.Sync
代码语言:javascript
复制
abstract static class Sync extends AbstractQueuedSynchronizer {     static final int SHARED_SHIFT   = 16;        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        /** 返回c的高16位  读状态*/        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }        /** 返回c的低16位  写状态*/        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    // 本地线程计数器    private transient ThreadLocalHoldCounter readHolds;    // 缓存的计数器    private transient HoldCounter cachedHoldCounter;    // 第一个读线程    private transient Thread firstReader = null;    // 第一个读线程的计数    private transient int firstReaderHoldCount;}

可以看出Sync继承自AbstractQueuedSynchronizer。两个获取状态的方法:

  • sharedCount方法:获取读锁的状态;
  • exclusiveCount方法:获取写锁的状态。

Sync内部类

Sync类内部存在两个内部类,分别为HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要与读锁配套使用,其中,HoldCounter源码如下:

代码语言:javascript
复制
// 计数器static final class HoldCounter {    // 计数    int count = 0;    // Use id, not reference, to avoid garbage retention    // 获取当前线程的TID属性的值,不使用线程引用,防止GC Root时因为留有引用而无法回收    final long tid = getThreadId(Thread.currentThread());}

HoldCounter主要有两个属性,count和tid,其中count表示某个读线程重入的次数,tid表示该线程的tid字段的值,该字段可以用来唯一标识一个线程。ThreadLocalHoldCounter的源码如下:

代码语言:javascript
复制
// 本地线程计数器static final class ThreadLocalHoldCounter    extends ThreadLocal<HoldCounter> {    // 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值    public HoldCounter initialValue() {        return new HoldCounter();    }}

ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类可以将线程与对象相关联。在没有进行set的情况下,get到的均是initialValue方法里面生成的那个HolderCounter对象。它们的初始化在Sync的构造方法中:

代码语言:javascript
复制
// 构造函数Sync() {    // 本地线程计数器    readHolds = new ThreadLocalHoldCounter();    // 设置AQS的状态    setState(getState()); // ensures visibility of readHolds}

主要进行了ThreadLocalHoldCounter的初始化和AQS状态的初始化。

java.util.concurrent.locks.ReentrantReadWriteLock.FairSync
代码语言:javascript
复制
    // 同步的公平版本    static final class FairSync extends Sync {        private static final long serialVersionUID = -2274990926593161451L;        final boolean writerShouldBlock() {            return hasQueuedPredecessors();        }        final boolean readerShouldBlock() {            return hasQueuedPredecessors();        }    }

在公平版本里的两个方法writerShouldBlock方法和readerShouldBlock方法中调用的都是hasQueuedPredecessors方法,该方法的实现在Sync的父类AbstractQueuedSynchronizer中,在分析ReentrentLock源码时有详细地分析过,主要是判断当前AQS同步队列中是否有等待获取锁时间更长的节点,维持这个队列FIFO。

java.util.concurrent.locks.ReentrantReadWriteLock.NonfairSync
代码语言:javascript
复制
static final class NonfairSync extends Sync {        private static final long serialVersionUID = -8159625535654395037L;        final boolean writerShouldBlock() {            // writers必须是第一位的,可以乱闯            return false; // writers can always barge        }        final boolean readerShouldBlock() {           //为了避免无限期地饿死writer,如果暂时看起来队列头的线程(如果存在的话)是一个等待中的writer则阻塞。这只是一种概率效应,因为一个新的reader在当存在一个writer排在其他还没有从队列中出队的可行的readers后面的时候不会阻塞。            return apparentlyFirstQueuedIsExclusive();        }    }

这个是Sync的非公平版本。

我们来分析下java.util.concurrent.locks.AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive方法:

代码语言:javascript
复制
// 如果队列中的第一个入队列的线程处于排他模式的等待状态则返回true。如果这个方法返回true,并且当前线程正准备去以共享模式去获取锁(这就是,这个方法通过tryAcquireShared方法触发)然后它保证了当前线程不是第一个入队的线程。仅仅用于读写锁的一个启发。    final boolean apparentlyFirstQueuedIsExclusive() {        Node h, s;        // head为头节点并且head节点和其后继节点不能为null,并且这个后继节点是处于排他模式的且它的线程不为null时返回true        return (h = head) != null &&            (s = h.next)  != null &&            !s.isShared()         &&            s.thread != null;    }

目的是不应该让写锁始终等待。作为一个启发式方法用于避免可能的写线程饥饿,这只是一种概率性的作用,因为如果有一个等待的写线程在其他尚未从队列中出队的读线程后面等待,那么新的读线程将不会被阻塞。

写锁

构造方法

代码语言:javascript
复制
      protected WriteLock(ReentrantReadWriteLock lock) {            sync = lock.sync;        }

加锁

java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock方法
代码语言:javascript
复制
        // 获取写锁        // 如果没有读锁或者写锁被其他线程占有时获取写锁会立即返回,设置写锁拥有数量为1          // 如果当前线程已经获取到写锁,然后占有数量会加1,并且立即返回        // 如果锁是由另一个线程持有的,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到获取了写锁为止,此时写锁的保持计数被设置为一个。        public void lock() {            sync.acquire(1);        }
  • 如果没有读锁或者写锁被其他线程占有时获取写锁会立即返回,设置写锁拥有数量为1;
  • 如果当前线程已经获取到写锁,然后占有数量会加1,并且立即返回;
  • 如果锁是由另一个线程持有的,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到获取了写锁为止,此时写锁的保持计数被设置为一个。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire方法
代码语言:javascript
复制
 public final void acquire(int arg) {        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }

关于这个方法在ReentrantLock源码分析中已经详细分析过,这里不再过多地进行分析,主要来看下这里tryAcquire方法的实现。

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquire
代码语言:javascript
复制
 @ReservedStackAccess        protected final boolean tryAcquire(int acquires) {            // 预排:            // 1. 如果读的数量不为0或者写的数量不为0并且拥有锁的不是当前线程,会失败。            // 2. 如果计数饱和,则失败(这只会在count已经不为0时出现)            // 3. 否则,如果该线程是可重入获取或队列策略允许,则有资格进行锁定。如果这样,更新state的值并且设置owner            Thread current = Thread.currentThread();            // 当前AQS的状态            int c = getState();            // 获取写锁的数量            int w = exclusiveCount(c);            //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁            if (c != 0) {                // (Note: if c != 0 and w == 0 then shared count != 0)                // 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false;                // 如果写锁状态不为0且写锁没有被当前线程持有返回false                if (w == 0 || current != getExclusiveOwnerThread())                    return false;                // w为当前已经存在的写锁数量,exclusiveCount(acquires)为将要获取的写锁的数量,而MAX_COUNT的值为65536;这个判断的作用是校验同一线程获取写锁是否超过最大次数(65535),支持可重入                if (w + exclusiveCount(acquires) > MAX_COUNT)                    throw new Error("Maximum lock count exceeded");                // Reentrant acquire                // 获取锁                // 能进入到这里代表current的值与getExclusiveOwnerThread()是一样的,所以这里只需要设置AQS的state值就行了                setState(c + acquires);                return true;            }            // 到这里说明此时c=0,读锁和写锁都没有被获取,writerShouldBlock方法用于判断是否需要阻塞            // 这里需要注意当writerShouldBlock为true时,会直接返回false,如果writerShouldBlock值为false,那么会进入state的CAS操作,CAS操作成功后才会进入下面的setExclusiveOwnerThread            if (writerShouldBlock() ||                !compareAndSetState(c, c + acquires))                return false;            // 这里会将当前线程设置为独占线程            setExclusiveOwnerThread(current);            return true;        }
  • 当AQS的状态值不为0时,证明读锁或写锁已经被其他线程占有,而如果此时读锁的数量为0或者当前线程不是锁的独占线程时证明是有其他线程在占有锁,所以tryAcquire失败,返回false。否则,如果当前线程获取的写锁的数量不超过独占锁允许的最大数量则允许获取。
  • 当AQS的状态值为0时,当写不需要阻塞时将会尝试CAS设置state的值,设置成功将会把当前线程设置为独占线程。
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#tryLock():
代码语言:javascript
复制
 public boolean tryLock() {     return sync.tryWriteLock(); }
    // 两种模式下均允许插入。除了没有调用writerShouldBlock的调用外,这与tryAcquire的作用相同    @ReservedStackAccess    final boolean tryWriteLock() {        Thread current = Thread.currentThread();        int c = getState();        if (c != 0) {            int w = exclusiveCount(c);            if (w == 0 || current != getExclusiveOwnerThread())                return false;            if (w == MAX_COUNT)                throw new Error("Maximum lock count exceeded");        }        if (!compareAndSetState(c, c + 1))            return false;        setExclusiveOwnerThread(current);        return true;    }

除了没有调用writerShouldBlock方法外,这与tryAcquire的作用相同。

解锁

java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock方法
代码语言:javascript
复制
 public void unlock() {     sync.release(1); }

release方法调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#release,这个方法的源码在之前的ReentrantLock源码中已经分析过,这里不再作详细分析。release方法的作用是释放掉指定数量的当前线程的许可,并尝试唤醒当前AQS队列头节点的后继节点(如果有的话)。

读锁

构造方法

代码语言:javascript
复制
  protected ReadLock(ReentrantReadWriteLock lock) {            sync = lock.sync;        }

加锁

java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#lock
代码语言:javascript
复制
        // 获取读锁;        // 在写锁没有被其他线程占有时获取读锁会立即返回        // 如果写锁被其他线程占有那么当前线程出于线程调度目的而被禁用,并且在获取读锁之前一直处于休眠状态        public void lock() {            sync.acquireShared(1);        }
  • 在写锁没有被其他线程占有时获取读锁会立即返回;
  • 如果写锁被其他线程占有那么当前线程会出于线程调度目的而被禁用,并且在获取读锁之前一直处于休眠状态。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared
代码语言:javascript
复制
// 以共享模式获取,忽略线程中断的影响。通过至少调用一次tryAcquireShared来实现,成功时返回,不然线程将会入队列,可能重复地阻塞和解除阻塞,调用tryAcquireShared方法直到成功。    public final void acquireShared(int arg) {        if (tryAcquireShared(arg) < 0)            doAcquireShared(arg);    }

以共享模式获取,忽略线程中断的影响。通过至少调用一次tryAcquireShared来实现,成功时返回,不然线程将会进入队列,可能重复地阻塞和解除阻塞,调用tryAcquireShared方法直到成功(这个体现在doAcquireShared方法内部)。acquireShared()首先会通过tryAcquireShared()来尝试获取锁。尝试成功的话,则不再做任何动作(因为已经成功获取到锁了)。尝试失败的话,则通过doAcquireShared()来获取锁。doAcquireShared()会获取到锁了才返回。

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared
代码语言:javascript
复制
   @ReservedStackAccess        protected final int tryAcquireShared(int unused) {            // 1. 如果写锁被其他线程获取到了,代表其他线程正在独占写,其他线程不能读,返回失败。            // 2. 否则,此线程有资格进入锁定wrt状态,因此轮问是否由于队列策略而应该阻塞。如果不是,尝试按CASing状态授予许可并更新计数。请注意,此步骤不检查重入获取的情况,这会推迟到完整版本方法,以避免在更典型的非重入情况下检查占有数量。            // 3. 如果第2步失败,或者由于线程显然不符合条件或者CAS失败或计数饱和,将会交给完全重试版本的获取方法            Thread current = Thread.currentThread();            int c = getState();            // 这里要考虑写锁降级为读锁的情况,锁降级是指当前线程拥有写锁,在写锁还没有释放时去获取读锁,这时exclusiveCount(c) != 0为true,但是 getExclusiveOwnerThread() != current返回false,所以会继续到下面的流程中            if (exclusiveCount(c) != 0 &&                getExclusiveOwnerThread() != current)                // 如果写锁被其他线程获取到了,代表其他线程正在独占写,其他线程不能读,返回失败。                return -1;            // 共享锁的数量            int r = sharedCount(c);            // 1. 判断读锁是否需要等待,这个在公平和非公平锁中有不同的实现;            // 2. 持有读锁线程小于最大值 65535            // 3. cas设置读锁的状态            if (!readerShouldBlock() &&                r < MAX_COUNT &&                compareAndSetState(c, c + SHARED_UNIT)) {                if (r == 0) {                    // r == 0 代表当前线程是第一个读锁线程                    firstReader = current;                    // firstReader的计数数量设置为1                    firstReaderHoldCount = 1;                } else if (firstReader == current) {                    // 如果第一个读线程就是当前获取读锁的线程,只需要将firstReader的计数器递增                    firstReaderHoldCount++;                } else {// 读锁数量不为0并且不为当前线程                    // 获取HoldCounter(用于保存线程的重入数)                    HoldCounter rh = cachedHoldCounter;                    // HoldCounter计数器为空或者计数器的tid不为当前运行线程的tid                    if (rh == null ||                        rh.tid != LockSupport.getThreadId(current))                        // 去readHolds中获取                        cachedHoldCounter = rh = readHolds.get();                    else if (rh.count == 0)// 如果计数为0                        // 将rh加入到readHolds中                        readHolds.set(rh);                    // HoldCounter计数器加1                    rh.count++;                }                return 1;            }            return fullTryAcquireShared(current);        }

        // 完整版本的获取读锁的方法,可处理tryAcquireShared中未处理的CAS丢失和可重入的读操作。        final int fullTryAcquireShared(Thread current) {            /*             * This code is in part redundant with that in             * tryAcquireShared but is simpler overall by not             * complicating tryAcquireShared with interactions between             * retries and lazily reading hold counts.             */            HoldCounter rh = null;            for (;;) {// 注意,这是在一个无限的特循中进行的                int c = getState();                if (exclusiveCount(c) != 0) {// 写线程数量不为0                    if (getExclusiveOwnerThread() != current)// 独占线程不是当前线程                        return -1;                    // else we hold the exclusive lock; blocking here                    // would cause deadlock.                } else if (readerShouldBlock()) {// 写线程数量为0并且读线程需要被阻塞                    // Make sure we're not acquiring read lock reentrantly                    if (firstReader == current) {//如果当前线程为第一个读线程                        // assert firstReaderHoldCount > 0;                    } else {// 当前线程不是firstReader                        if (rh == null) {// 这部分操作与上面的方法中的相同,获取每个线程的重入数量计数器,在重入数量为0时从ThreadLocal对象中清除                            rh = cachedHoldCounter;                            if (rh == null ||                                rh.tid != LockSupport.getThreadId(current)) {                                rh = readHolds.get();                                if (rh.count == 0)                                    readHolds.remove();                            }                        }                        if (rh.count == 0)                            return -1;                    }                }                // 如果此时读锁的数量达到最大值,则抛出异常                if (sharedCount(c) == MAX_COUNT)                    throw new Error("Maximum lock count exceeded");                if (compareAndSetState(c, c + SHARED_UNIT)) {// CAS设置获取读锁                    if (sharedCount(c) == 0) {//获取成功,并且读锁数量为0                        // 设置当前线程为firstReader                        firstReader = current;                        // 设置firstReader重入的数量                        firstReaderHoldCount = 1;                    } else if (firstReader == current) {                        firstReaderHoldCount++;                    } else {                        if (rh == null)                            rh = cachedHoldCounter;                        if (rh == null ||                            rh.tid != LockSupport.getThreadId(current))                            rh = readHolds.get();                        else if (rh.count == 0)                            readHolds.set(rh);                        rh.count++;                        cachedHoldCounter = rh; // cache for release                    }                    return 1;                }            }        }
  1. 如果写锁被其他线程获取到了,代表其他线程正在独占写,其他线程不能读,返回失败。
  2. 否则,此线程有资格进入锁定wrt状态,因此轮问是否由于队列策略而应该阻塞。如果不是,尝试按CAS对AQS的队列进行共享锁的更新。请注意,此步骤不检查重入的情况,这会推迟到完整版本方法,以避免在更典型的非重入情况下检查占有数量。
  3. 如果第2步失败,或者由于线程显然不符合条件或者CAS失败或计数饱和,将会交给完全重试版本的获取方法fullTryAcquireShared。
  4. fullTryAcquireShared主要处理tryAcquireShared方法中CAS失败或计数饱和或者readerShouldBlock判断为true的获取写锁的问题,它是放在一个无限循环中,只有有结果返回时才会结束。
  5. 这里着重说下这段代码:
代码语言:javascript
复制
if (exclusiveCount(c) != 0 &&                getExclusiveOwnerThread() != current)

有这种一种情况,当前线程已经拥有写锁,那么exclusiveCount(c) != 0返回为true,而此时第二个条件getExclusiveOwnerThread() != current返回false,条件不成立,会继续进入获取读锁的流程。也就是说当前线程已经有读锁之后,还能够继续获取读锁;而如果写锁是被其他线程占有的,此时是不能获取读锁的。

java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared

代码语言:javascript
复制
/**     * Acquires in shared uninterruptible mode.     * @param arg the acquire argument     */    private void doAcquireShared(int arg) {        // 添加一个SHARED模式的节点到AQS队列中        final Node node = addWaiter(Node.SHARED);        boolean interrupted = false;        try {            for (;;) {                // 找到节点的前置节点                final Node p = node.predecessor();                if (p == head) {                    // 如果节点的前置节点是头节点则尝试去获取锁                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        return;                    }                }                // 获取失败是否需要让线程进入等待状态                if (shouldParkAfterFailedAcquire(p, node))                    interrupted |= parkAndCheckInterrupt();            }        } catch (Throwable t) {            cancelAcquire(node);            throw t;        } finally {            if (interrupted)                selfInterrupt();        }    }
    // 设置当前节点为AQS队列的头节点,并且检查后继节点是否是处于共享模式的等待状态,如果此时传播> 0或PROPAGATE状态被设置则传播    private void setHeadAndPropagate(Node node, int propagate) {        Node h = head; // Record old head for check below        setHead(node);        /*         * Try to signal next queued node if:         *   Propagation was indicated by caller,         *     or was recorded (as h.waitStatus either before         *     or after setHead) by a previous operation         *     (note: this uses sign-check of waitStatus because         *      PROPAGATE status may transition to SIGNAL.)         * and         *   The next node is waiting in shared mode,         *     or we don't know, because it appears null         *         * The conservatism in both of these checks may cause         * unnecessary wake-ups, but only when there are multiple         * racing acquires/releases, so most need signals now or soon         * anyway.         */        if (propagate > 0 || h == null || h.waitStatus < 0 ||            (h = head) == null || h.waitStatus < 0) {            Node s = node.next;            // 如果后续节点为null或者它处于shared状态则唤醒后继节点并确保传播            if (s == null || s.isShared())                doReleaseShared();        }    }

  /**     * Release action for shared mode -- signals successor and ensures     * propagation. (Note: For exclusive mode, release just amounts     * to calling unparkSuccessor of head if it needs signal.)     */    // 共享模式下的释放动作-表示唤醒后继节点并确保传播    private void doReleaseShared() {        /*         * Ensure that a release propagates, even if there are other         * in-progress acquires/releases.  This proceeds in the usual         * way of trying to unparkSuccessor of head if it needs         * signal. But if it does not, status is set to PROPAGATE to         * ensure that upon release, propagation continues.         * Additionally, we must loop in case a new node is added         * while we are doing this. Also, unlike other uses of         * unparkSuccessor, we need to know if CAS to reset status         * fails, if so rechecking.         */        for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                    // cas头节点的waitStatus                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))                        continue;            // loop to recheck cases                    unparkSuccessor(h);                }                else if (ws == 0 &&                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))                    continue;                // loop on failed CAS            }            if (h == head)                   // loop if head changed                break;        }    }

doAcquireShared()的作用是获取共享锁。它会首先创建线程对应的AQS队列的节点,然后将该节点添加到AQS队列中。AQS队列是管理获取锁的等待线程的队列。如果“当前线程”是AQS队列的表头,则尝试获取共享锁;否则,则需要通过shouldParkAfterFailedAcquire()判断是否阻塞等待,需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。doAcquireShared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。需要注意的是:doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的。

java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#tryLock()

代码语言:javascript
复制
public boolean tryLock() {    return sync.tryReadLock();}
@ReservedStackAccessfinal boolean tryReadLock() {    // 当前线程    Thread current = Thread.currentThread();    for (;;) {        int c = getState();        // 1. 排他锁的数量不为0;        // 2. 当前线程不是排他锁的拥有线程        if (exclusiveCount(c) != 0 &&            getExclusiveOwnerThread() != current)            return false;        // 共享锁的数量        int r = sharedCount(c);        // 共享锁数量达到最大值时抛出异常        if (r == MAX_COUNT)            throw new Error("Maximum lock count exceeded");        // cas设置共享锁state        if (compareAndSetState(c, c + SHARED_UNIT)) {            if (r == 0) {                // 当前线程为firstReader                firstReader = current;                firstReaderHoldCount = 1;            } else if (firstReader == current) {                firstReaderHoldCount++;            } else {                HoldCounter rh = cachedHoldCounter;                if (rh == null ||                    rh.tid != LockSupport.getThreadId(current))                    cachedHoldCounter = rh = readHolds.get();                else if (rh.count == 0)                    readHolds.set(rh);                rh.count++;            }            return true;        }    }}

该方法与tryAcquireShared大致相同,只是不需要判断readerShouldBlock,竞争失败也不需要进入fullTryAcquireShared方法再尝试获取锁。然后它是在一个无限for循环中的,只有当有结果返回时才会停止循环。

释放锁

java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#unlock:

代码语言:javascript
复制
public void unlock() {    sync.releaseShared(1);}

实际上调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared方法,代码如下:

代码语言:javascript
复制
 public final boolean releaseShared(int arg) {        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReleaseShared

代码语言:javascript
复制
 protected final boolean tryReleaseShared(int unused) { 2     // 获取当前线程 3     Thread current = Thread.currentThread(); 4     if (firstReader == current) { // 当前线程为第一个读线程 5         // assert firstReaderHoldCount > 0; 6         if (firstReaderHoldCount == 1) // firstReader重入的数量为1 7             firstReader = null; 8         else // 减少firstReader重入的数量 9             firstReaderHoldCount--;10     } else { // 当前线程不为第一个读线程11         // 获取缓存的线程重入计数器12         HoldCounter rh = cachedHoldCounter;13         if (rh == null || rh.tid != getThreadId(current)) // 重入计数器为空或者计数器的tid不为当前正在运行的线程的tid14             // 获取当前线程对应的重入计数器15             rh = readHolds.get();16         // 获取计数17         int count = rh.count;18         if (count <= 1) { // 计数小于等于119             // 移除20             readHolds.remove();21             if (count <= 0) // 计数小于等于0,抛出异常22                 throw unmatchedUnlockException();23         }24         // 减少线程重入计数25         --rh.count;26     }27     for (;;) { // 无限循环28         // 获取状态29         int c = getState();30         // 获取状态31         int nextc = c - SHARED_UNIT;32         if (compareAndSetState(c, nextc)) // 比较并进行设置33             // Releasing the read lock has no effect on readers,34             // but it may allow waiting writers to proceed if35             // both read and write locks are now free.36             return nextc == 0;37     }38 }

此方法表示读锁线程释放锁。首先判断当前线程是否为第一个读线程firstReader,若是,则判断第一个读线程占有的资源数firstReaderHoldCount是否为1,若是,则设置第一个读线程firstReader为空,否则,将第一个读线程占有的资源数firstReaderHoldCount减1;若当前线程不是第一个读线程,那么首先会获取缓存计数器(上一个读锁线程对应的计数器 ),若计数器为空或者tid不等于当前线程的tid值,则获取当前线程的计数器,如果计数器的计数count小于等于1,则移除当前线程对应的计数器,如果计数器的计数count小于等于0,则抛出异常,之后再减少计数即可。无论何种情况,都会进入无限循环,该循环可以确保成功设置状态state。

读写锁的特点

而读写锁有以下三个重要的特性:

(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。

(2)重进入:读锁和写锁都支持线程重进入。

(3)在线程持有读锁的情况下,该线程不能取得写锁。在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。因为当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。如果写锁被其他线程获取到了,代表其他线程正在独占写,之外的线程不能读,返回失败。

(4)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。锁降级指的是写锁降级成为读锁。锁降级是指把持住当前拥有的写锁的同时,再获取到读锁,随后释放写锁的过程。以下是oracle官网的对于锁降级的示例代码:

代码语言:javascript
复制
class CachedData {   Object data;   volatile boolean cacheValid;   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
   void processCachedData() {     rwl.readLock().lock();     if (!cacheValid) {        // Must release read lock before acquiring write lock        rwl.readLock().unlock();        rwl.writeLock().lock();        try {          // Recheck state because another thread might have          // acquired write lock and changed state before we did.          if (!cacheValid) {            data = ...            cacheValid = true;          }          // Downgrade by acquiring read lock before releasing write lock          rwl.readLock().lock();        } finally {          rwl.writeLock().unlock(); // Unlock write, still hold read        }     }
     try {       use(data);     } finally {       rwl.readLock().unlock();     }   } }

代码中声明了一个volatile类型的cacheValid变量,保证其可见性。首先获取读锁,如果cache不可用,则释放读锁,获取写锁,在更改数据之前,再检查一次cacheValid的值进行double-check,然后修改数据,将cacheValid置为true,然后在释放写锁前获取读锁

此时,cache中数据可用,处理cache中数据,最后释放读锁。这个过程就是一个完整的锁降级的过程,目的是保证数据可见性。如果当前的线程C在修改完cache中的数据后,没有获取读锁而是直接释放了写锁,那么假设此时另一个线程T获取了写锁并修改了数据,那么C线程无法感知到数据已被修改,则数据出现错误。如果遵循锁降级的步骤,线程C在释放写锁之前获取读锁,那么线程T在获取写锁时将被阻塞,直到线程C完成数据处理过程,释放读锁。

这段说明来自: https://www.cnblogs.com/takemybreathaway/articles/9399914.html

锁降级中读锁的获取是否必要呢?

答案是必要的。主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁, 假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。

ReadWriteLock与StampedLock

ReadWriteLock可以解决有线程在写但是其他线程还能读取的问题。如果我们深入分析 ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock

StampedLockReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。

乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 结构图
  • 锁的属性
  • 构造方法
  • FairSync与NonfairSync
    • java.util.concurrent.locks.ReentrantReadWriteLock.Sync
      • Sync内部类
        • java.util.concurrent.locks.ReentrantReadWriteLock.FairSync
        • java.util.concurrent.locks.ReentrantReadWriteLock.NonfairSync
    • 写锁
      • 构造方法
        • 加锁
          • java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock方法
          • java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire方法
          • java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquire
          • java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#tryLock():
        • 解锁
          • java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock方法
      • 读锁
        • 构造方法
          • 加锁
            • java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#lock
            • java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared
            • java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared
          • java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared
            • java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#tryLock()
            • 释放锁
              • java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#unlock:
                • java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReleaseShared
                • 读写锁的特点
                • ReadWriteLock与StampedLock
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档