首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

并发编程之ReentrantReadWriteLock

这篇文章主要来讲讲ReentrantReadWriteLock,他与

ReentrantLock 有点联系,甚至说,它可以代替ReentrantLock出现,从名字上面可以看出,它提供了读锁和写锁。

What is ReentrantReadWriteLock

读写锁,记得学习AbstractQueuedSynchronizer的时候,里面两种锁实现(排他锁和共享锁),而ReentrantLock,则只实现了其中的排他锁。另外在排他锁基础上,又分为了公平锁和非公平锁。

而ReentrantReadWriteLock,则是利用AQS实现了排他锁和共享锁,即写锁(WriteLock)和读锁(ReadLock)。其中,你可以简单理解,ReentrantReadWriteLock里面的写锁,就是排他锁。

ReentrantReadWriteLock类结构

先简单用几章图来了解下ReentrantReadWriteLock的类结构:

在ReentrantReadWriteLock里面,有下列变量:

sync,它是继承与AQS,并对里面两种性质锁都给出了实现。

readLock,读锁

writeLock,写锁

ReadLock和WriteLock,均实现Lock接口,代表它是一个锁,其中里面的方法,都是间接调用sync的方法。

最开始我看ReentrantReadWriteLock的时候,我在想,既然是读写锁,为什么不分开2个类呢?一个读锁类,一个写锁类。但是后面慢慢学习中,我发现我这种思路并不可行,因为你把两个锁分开了,也就是你可能需要两个AQS,这样一来,你每次想知道是否有读锁的时候,都会去访问另一个写锁,而写锁又会变更,又不是在同一个类,所以又是一个并发问题。而且对于读锁的可重入问题,也不好解决,又不知道什么时候没有读锁了。

所以还是Doug Lea的思路比较好。

这里先讲讲ReentrantReadWriteLock的基本框架:

首先,在其内部维持这一对锁,读锁和写锁。

读锁和写锁可以分为公平和非公平。

写锁,可以可重入,这里和ReentrantLock一样。

读锁,也可以可重入,这里是用HoldCounter去记录每一个线程的重入数量

HoldCounter

我们发现,在Sync里面,有个HoldCounter的私有内部类。HoldCount是什么呢?

主要用于读锁。

先看看它的定义:

static final class HoldCounter {

int count = 0;

// Use id, not reference, to avoid garbage retention

//用id而不是引用来避免垃圾回收

final long tid = getThreadId(Thread.currentThread());

}

这个HoldCounter是用来干嘛的呢?

前面说过,读锁,也有一个重入的概念,也就是某一个线程获取读锁时,他会获得一个HoldCounter,然后当他再次获取这把读锁时,就不会再次获得HoldCounter,而是将以前获得的这个HoldCounter的count自增1。当失去的时候,就自1,如果当这个count为0时,这个HoldCounter就会被ReentrantReadWriteLock抛弃。

上面有一段加粗的文字,那么有个问题,如何实现这样一个HoldCounter呢?是线程私有的,并且还是能够被ReentranReadWriteLock拥有呢?

这里就要说说ThreadLocal类了。

ThreadLocal:

ThreadLocal并不是concurrent包下面的类,而是lang包下面一个类,代表是线程的本地变量,简单说,就是如果多线程下,每个线程都有一个属于线程私有的并且是线程安全的变量。

该类里面有以下几个要点,具体就不细讲,将总结后的要点贴出:

每个线程有个私有的ThreadLocal

某一个条件下使用所有的ThreadLocal,都由一个ThreadLocal里面自实现的map管理,map的hash函数为:key.threadLocalHashCode & (len-1),key就是当前线程的ThreadLocal。

整个实现机制,就是ThreadLocal自己管理自己的map集合。一个新线程需要ThreadLocal,我就找下有没有,没有就给他个新的。

这里看ReentrantReadWriteLock里面的用法:

static final class ThreadLocalHoldCounter

extends ThreadLocal {

public HoldCounter initialValue() {

return new HoldCounter();

}

}

从上面可以看出,ThreadLocalHoldCounter是一个ThreadLocal,并且ThreadLocal里面存的是HoldCounter。

为了保证,每个线程只有一个ThreadLocal,并且如果再次获取读锁时,不会再获取一个ThreadLocal,而是用原来那个,就需要用到上面那个initialValue方法。它是延迟执行的,并不是一旦初始化ThreadLocal就会执行initialValue方法,而是执行get时候才会调用,并且只会调用一次,第二次用时候会使用原来那个ThreadLocal。

接下来就不直接介绍Sync,而是以ReadLock和WriteLock来讲解:

ReadLock

ReadLock里面父类的一个Sync:

private final Sync sync;

protected ReadLock(ReentrantReadWriteLock lock) {

sync = lock.sync;

}

读锁的获取:

lock方法执行流程:

Sync调用父类AQS的acquireShared(1) 。

父类AQS再调用子类的重写方法tryAcquireShared(1)

如果tryAcquireShared获取成功,则获取锁,否则执行doAcquireShared,阻塞式执行,要么获取成功,否则挂起等待。

现在具体讲讲子类Sync重写的tryAcquireShared方法:

protected final int tryAcquireShared(int unused) {

/*

* 如果不是当前线程,则直接失败。

* 尝试去判断是否应该被阻塞,否则的话,就用CAS去更改state变量。

* 如果第二项失败了的话,那么就尝试去自旋方式完成。

*/

Thread current = Thread.currentThread();

int c = getState();

//如果,有写锁,并且不是自己,那么就退出,

if (exclusiveCount(c) != 0 &&

getExclusiveOwnerThread() != current)

//如果此时有写锁。

return -1;

/*反言之,如果有没有写锁,或者有写锁,但是是自己的写锁,那么还是可以获取的,短路与可以判断出来。这里就是锁降级的意思*/

//获取共享锁数量

int r = sharedCount(c);

if (!readerShouldBlock() &&

r

compareAndSetState(c, c + SHARED_UNIT)) {

if (r == 0) {

//头一次,也就既没有写锁,也没有读锁获取的时候。

firstReader = current;

firstReaderHoldCount = 1;

//firstReader就像,每次最新的获取锁的进程。

} else if (firstReader == current) {

//重入增加

firstReaderHoldCount++;

} else {

//已经有写锁获取,并且不是firstReader,那么就从cacheHoldCounter里面,去寻找当前变量的threadLocal。

HoldCounter rh = cachedHoldCounter;

if (rh == null || rh.tid != getThreadId(current))

cachedHoldCounter = rh = readHolds.get();

else if (rh.count == 0)

readHolds.set(rh);

rh.count++;

}

return 1;

}

return fullTryAcquireShared(current);

}

fullTryAcquireShared方法意思就是,如果前面的都没有过,那么久采取用自旋的方式去获取锁。

具体 看如下代码:

final int fullTryAcquireShared(Thread current) {

HoldCounter rh = null;

for (;;) {

int c = getState();

if (exclusiveCount(c) != 0) {

//排他锁有的话

if (getExclusiveOwnerThread() != current)

return -1;

// else we hold the exclusive lock; blocking here

// would cause deadlock.

} else if (readerShouldBlock()) {

//需要被阻塞。

// Make sure we're not acquiring read lock reentrantly

if (firstReader == current) {

// assert firstReaderHoldCount > 0;

} else {

if (rh == null) {

rh = cachedHoldCounter;

if (rh == null || rh.tid != getThreadId(current)) {

rh = readHolds.get();

//如果readHolds为0,即没有重入锁了,那么就删除它。

if (rh.count == 0)

readHolds.remove();

}

}

if (rh.count == 0)

return -1;

}

}

if (sharedCount(c) == MAX_COUNT)

throw new Error("Maximum lock count exceeded");

if (compareAndSetState(c, c + SHARED_UNIT)) {

//获取成功,那么就尝试替换。

if (sharedCount(c) == 0) {

firstReader = current;

firstReaderHoldCount = 1;

} else if (firstReader == current) {

firstReaderHoldCount++;

} else {

if (rh == null)

rh = cachedHoldCounter;

if (rh == null || rh.tid != getThreadId(current))

rh = readHolds.get();

else if (rh.count == 0)

readHolds.set(rh);

rh.count++;

cachedHoldCounter = rh; // cache for release

}

return 1;

}

}

}

接下来看读锁的释放:

由ReadLock的unlock方法。

调用Sync父类AQS的releaseShared(1) 方法。

调用Sync重写的tryReleaseShared 方法。

如果不成功,则调用父类的doReleaseShared 阻塞获取或者阻塞。

接下来看看子类重写的releaseShared方法:

//释放共享锁。

protected final boolean tryReleaseShared(int unused) {

Thread current = Thread.currentThread();

if (firstReader == current) {

//当前线程为firstReader时

// assert firstReaderHoldCount > 0;

if (firstReaderHoldCount == 1)

//直接把firstReader设为null

firstReader = null;

else

//重入数量-1

firstReaderHoldCount--;

} else {

//不是firstReader的话,那么就要从threadLocal里面操作相应的

HoldCounter rh = cachedHoldCounter;

if (rh == null || rh.tid != getThreadId(current))

rh = readHolds.get();

int count = rh.count;

if (count

//说明没有重入锁了,直接把你这个线程变量从readHolds里面删除。

readHolds.remove();

if (count

//出错。

throw unmatchedUnlockException();

}

--rh.count;

}

for (;;) {

//阻塞性的CAS方法。

int c = getState();

int nextc = c - SHARED_UNIT;

if (compareAndSetState(c, nextc))

// Releasing the read lock has no effect on readers,

// but it may allow waiting writers to proceed if

// both read and write locks are now free.

return nextc == 0;

}

}

具体释放锁相关代码意思已经在注释中给出,这里主要看看三个变量,firstReader,firstReaderHoldCount,cachedHoldCounter。

前面说过,HoldCounter是线程的ThreadLocal变量,并且在每个获取读锁的线程都会有自己的HoldCounter,并且这些所有获取这个读锁的线程的HoldCounter,都被存到了这个读锁的ThreadLocal的map集合里面,那么每次读锁时候都要去通过hash算法去计算索引。其实这样是比较耗时的,特别是当多并发,虽然计算hash不用多耗时,但是并发下计算多次就很耗时了。所以记录了上面三个变量,在一定程度上起到了缓存的作用,避免了计算hash。

另一方面,firstReader和firstReaderHoldCounter也不会放到入到哪个ThreadLocal的map集合里面。

WriteLock

ReentrantReadWriteLock的写锁类似于ReentrantLock,具有排他性质,并且也具有公平锁和非公平锁的性质。

写锁的获取:

先看步骤:

1. WriteLock里面的lock方法。

2. lock方法里面通过Sync进入到父类的acquire 方法。

3. 然后父类的acquire方法,调用子类重写的tryAcquire方法,如果成功则获取锁,失败则新建一个Waiter,将该线程放入Waiter队列里面挂起等待。

接下来看子类tryAcquire方法的具体实现:

protected final boolean tryAcquire(int acquires) {

/*

* 1. 拥有者不是当前线程。

* 2. 数量满了。

*/

Thread current = Thread.currentThread();

int c = getState();

int w = exclusiveCount(c);

//有人已经获取了锁了。

if (c != 0) {

// (Note: if c != 0 and w == 0 then shared count != 0)

//上面那个note意思,如果没人获取锁,并且排他锁数量为0,所以

if (w == 0 || current != getExclusiveOwnerThread())

//不是当前线程

return false;

if (w + exclusiveCount(acquires) > MAX_COUNT)

//超出了65535

throw new Error("Maximum lock count exceeded");

// Reentrant acquire

setState(c + acquires);

return true;

}

//判断出来应该阻塞。

if (writerShouldBlock() ||

!compareAndSetState(c, c + acquires))

return false;

setExclusiveOwnerThread(current);

return true;

}

类似于ReentrantLock的流程,通过验证锁的状态,以及可重入的数量等。

写锁的释放:

先看看流程:

1. 执行WriteLock里面的unlock方法。

2. 在unlock方法里面,由Sync执行父类的release方法。

3. 父类再具体执行子类重写的tryRelease 方法。并唤醒下一个继任节点线程。

/**

* 释放锁。

*/

protected final boolean tryRelease(int releases) {

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

int nextc = getState() - releases;

//判断排他锁的重入数是否为0.

boolean free = exclusiveCount(nextc) == 0;

if (free)

setExclusiveOwnerThread(null);

setState(nextc);

return free;

}

具体的方法,就是利用CAS操作将state变量减少。并且判断锁的重入数量。

FairSync和NonfairSync

前面说过,ReentrantReadWriteLock里面,也有公平锁和非公平锁,但是它里面的实现却和ReentrantLock不同,这两个类里面,定义了两个相同的方法,用于判断是否需要阻塞:

NofairSync中:

static final class NonfairSync extends Sync {

private static final long serialVersionUID = -8159625535654395037L;

//写锁一般都能够获取。

final boolean writerShouldBlock() {

return false; // writers can always barge

}

final boolean readerShouldBlock() {

//一般就是,如果是排他锁性质,所以需要block。

return apparentlyFirstQueuedIsExclusive();

}

}

而在FairSync中:

/**

* Fair version of Sync

* 公平锁版本。判断是否需要block的情况就是,是否需要排队。

* 判断,AQS待获取资源节点是否有节点,也就是如果有,那么就需要等待,没有的话,时间片就是自己的。

*/

static final class FairSync extends Sync {

private static final long serialVersionUID = -2274990926593161451L;

final boolean writerShouldBlock() {

return hasQueuedPredecessors();

}

final boolean readerShouldBlock() {

return hasQueuedPredecessors();

}

}

State变量的拆分使用

最后看看一个比较有特点的点,因为ReentrantReadWriteLock,里面维护着两个锁,读锁写锁,但是里面却只有一个state变量来控制状态,这又是怎么实现的呢?

我们知道,一个int有4个字节,32位,没错,读锁写锁就是相应的利用这个32位字节int来表示的,这也就是为什么读锁写锁范围都是65535.

接下来看具体代码:

static final int SHARED_SHIFT = 16;

static final int SHARED_UNIT = (1

static final int MAX_COUNT = (1

static final int EXCLUSIVE_MASK = (1

/** Returns the number of shared holds represented in count */

//返回共享锁数量read

static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

/** Returns the number of exclusive holds represented in count */

//返回排他锁数量write

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

其中,高16位,用于读锁(共享锁),而低16位真是用于写锁,即低16位。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190127G0G24V00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券