前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java读写锁浅析

Java读写锁浅析

作者头像
luoxn28
发布2021-05-13 10:56:40
2.7K0
发布2021-05-13 10:56:40
举报
文章被收录于专栏:TopCoderTopCoder

Java读写锁,也就是ReentrantReadWriteLock,其包含了读锁和写锁,其中读锁是可以多线程共享的,即共享锁,而写锁是排他锁,在更改时候不允许其他线程操作。读写锁底层是同一把锁(基于同一个AQS),所以会有同一时刻不允许读写锁共存的限制。

读写锁 示例代码如下:

代码语言:javascript
复制
public static void main(String[] args) {
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    Thread t1 = new Thread(() -> {
        readLock.lock();
        System.out.println(Thread.currentThread().getName() + " read lock ok");
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        readLock.unlock();
    });

    Thread t2 = new Thread(() -> {
        readLock.lock();
        System.out.println(Thread.currentThread().getName() + " read lock ok");
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        readLock.unlock();
    });

    Thread t3 = new Thread(() -> {
        writeLock.lock();
        System.out.println(Thread.currentThread().getName() + " write lock ok");
        writeLock.unlock();
    });

    t1.start();
    t2.start();
    t3.start();
}

输出结果为:

代码语言:javascript
复制
Thread-0 read lock ok
Thread-1 read lock ok
Thread-2 write lock ok // 1s后才打印

Java读写锁主要是基于AQS(队列同步器)的独占和共享来完成功能的,AQS使用一个int成员变量(private volatile int state)表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。关于AQS具体可以参考:AQS是如何控制线程的

AQS只有一个int类型的state同步状态, 那它是如何维护独占和共享模式对应的状态的呢?由于int是32位的,因此将32一分为二,高16位给共享状态,低16位给独占状态,对应的代码如下:

代码语言:javascript
复制
abstract static class Sync extends AbstractQueuedSynchronizer {
    /*
     * Read vs write count extraction constants and functions.
     * Lock state is logically divided into two unsigned shorts:
     * The lower one representing the exclusive (writer) lock hold count,
     * and the upper the shared (reader) hold count.
     */

    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

为什么要使用高16位给共享状态,低16位给独占状态呢?其实理论上来讲,低16位作为共享状态也是OK的,但是由于AQS的独占模式下(比如ReentrantLock)是将整个int的state进行操作的(+1或-1),为了保持统一,JDK这里就将低16位作为独占状态,可以进行+1或-1操作,相应的对于共享状态的操作就是+/- compareAndSetState(c, c + SHARED_UNIT))了。

公平模式和非公平模式

对于读写锁来说,如果已加读锁,写锁会阻塞;如果已加写锁,读锁会阻塞。

非公平锁模式,可提高加锁效率(这也是一般的锁模式是非公平的原因),但是可能会造成阻塞线程一直获取不到锁。因此从原理上来讲,读写锁的非公平模式下的读锁插队竞争锁会导致等待写锁的线程一致阻塞(线程饥饿)。

那读写锁是如何处理的呢?在非公平锁情况下,允许写锁插队,也允许读锁插队,但是读锁插队的前提是队列中的头节点不能是想获取写锁的线程。在公平模式下,都是严格按照请求锁顺序进行的。

读加解锁流程

读锁ReentrantReadWriteLock.ReadLock的lock操作核心逻辑在方法java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared中,首先获取state状态,如果已有独占它的线程,直接返回并将执行方法doAcquireShared该线程添加到同步队列中;否则CAS将读状态进行"+1"操作(+/- compareAndSetState(c, c + SHARED_UNIT))),然后增加对应读线程统计值firstReaderHoldCount(对于第一个操作读状态线程)或者HoldCounter(后续操作读状态线程,ThreadLocal变量,HoldCounter统计值是为了防止多次unlock问题)。

代码语言:javascript
复制
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current) // 如果当前线程已占用写锁,还可以继续加读锁
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT && // 读锁最大同时出现个数为MAX_COUNT
        compareAndSetState(c, c + SHARED_UNIT)) { 
        // 统计值增加
        if (r == 0) {
            // 未加锁状态
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

读锁ReentrantReadWriteLock.ReadLock的unlock操作核心逻辑在方法java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReleaseShared中,首先更新统计值,然后获取state,CAS更新state,最后唤醒同步队列中阻塞线程。

代码语言:javascript
复制
protected final boolean tryReleaseShared(int unused) {
    // 统计值更新
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    // 循环CAS -SHARED_UNIT 更新值
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

写加解锁流程

写锁java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock的lock操作核心逻辑在方法java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquire中,首先获取state,如果已经被占用并且是当前线程占用,那么直接更新state返回;如果不是当前线程占用,返回false添加到阻塞队列中阻塞等待;否则CAS设置state,设置成功返回true,设置不成功表示已被其他线程抢先占用了,返回false添加到阻塞队列中阻塞等待。

代码语言:javascript
复制
protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 当前线程可重入
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires)) -- notice
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

如果这里调用compareAndSetState(c, c + acquires))设置不成功,表示已被其他线程抢先占用state了,这时需要添加到阻塞队列中阻塞等待,这里思考一个问题:如果此时还未添加到阻塞队列中,但是之前抢先占用state的线程已经释放state应该怎么办呢?这时需要在添加阻塞队列时需再次执行下tryAcquire来获取state。

写锁java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock的unlock操作核心逻辑在方法java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryRelease中,首先判断当前线程是否占有该state,然后释放同步队列中阻塞线程即可。

代码语言:javascript
复制
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

最后思考一个问题,读写锁,写锁被线程A占用时,线程B和C的读锁被阻塞在同步队列,如果线程A unlock后,线程B和C都会被唤醒么,如果是,那么分别都是谁唤醒的

先说结论:B和C都会被唤醒,A唤醒B,B唤醒C。首先线程A unlock时会唤醒下一个线程,如果同步队列中线程B排在线程C前面,那么就会唤醒线程B;线程B被唤醒后,首先会获取state,成功后会唤醒下一个紧挨着的阻塞等待的线程。这块的具体逻辑可以参考:AbstractQueuedSynchronizer#setHeadAndPropagateAbstractQueuedSynchronizer#doReleaseShared,这里就不再赘述了。

这里还可以引申出另外一个问题,如果同步队列中线程C后面又有一个线程D(获取写锁),那么C会唤醒D么,其实不会的,只不过D被唤醒前发现不是共享的会被继续阻塞而已。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-05-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 TopCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 公平模式和非公平模式
  • 读加解锁流程
  • 写加解锁流程
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档