关于读写锁,大家应该都了解JDK中的ReadWriteLock
, 当然Redisson也有读写锁的实现。
所谓读写锁,就是多个客户端同时加读锁,是不会互斥的,多个客户端可以同时加这个读锁,读锁和读锁是不互斥的
Redisson中使用RedissonReadWriteLock
来实现读写锁,它是RReadWriteLock
的子类,具体实现读写锁的类分别是:RedissonReadLock
和RedissonWriteLock
还是从官方文档中找的使用案例:
RReadWriteLock rwlock = redisson.getReadWriteLock("tryLock"); RLock lock = rwlock.readLock(); // or RLock lock = rwlock.writeLock(); // traditional lock method lock.lock(); // or acquire lock and automatically unlock it after 10 seconds lock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }
public class RedissonReadLock extends RedissonLock implements RLock { @Override <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'read'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('set', KEYS[2] .. ':1', 1); " + "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " + "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local key = KEYS[2] .. ':' .. ind;" + "redis.call('set', key, 1); " + "redis.call('pexpire', key, ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end;" + "return redis.call('pttl', KEYS[1]);", Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId)); } }
注: 以下文章中客户端A用:UUID_01:threadId_01标识 客户端B用:UUID_02:threadId_02标识
KEYS:
getName()
= tryLockgetReadWriteTimeoutNamePrefix(threadId)
= {anyLock}:UUID_01:threadId_01:rwlock_timeoutARGV:
接着对代码中lua脚本一行行解读:
此时redis中存在的数据结构为:
anyLock: { "mode": "read", "UUID_01:threadId_01": 1 } {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
继续分析,客户端A已经加过读锁,此时如果继续加读锁会怎样处理呢?
此时redis中存在的数据结构为:
anyLock: { “mode”: “read”, “UUID_01:threadId_01”: 2 } {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1 {anyLock}:UUID_01:threadId_01:rwlock_timeout:2 1
基本步骤和上面一直,加锁后redis中数据为:
anyLock: { "mode": "read", "UUID_01:threadId_01": 2, "UUID_02:threadId_02": 1 } {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1 {anyLock}:UUID_01:threadId_01:rwlock_timeout:2 1 {anyLock}:UUID_02:threadId_02:rwlock_timeout:1 1
这里需要注意一下: 为哈希表 key 中的域 field 的值加上增量 increment,如果 key 不存在,一个新的哈希表被创建并执行 HINCRBY 命令。
Redisson中由RedissonWriteLock
来实现写锁,我们看下写锁的核心逻辑:
public class RedissonWriteLock extends RedissonLock implements RLock { @Override <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'write'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (mode == 'write') then " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " + "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " + "return nil; " + "end; " + "end;" + "return redis.call('pttl', KEYS[1]);", Arrays.<Object>asList(getName()), internalLockLeaseTime, getLockName(threadId)); } }
还是像上面一样,一行行来分析每句lua脚本执行语义。
KEYS和ARGV参数:
此时redis中数据格式为:
anyLock: { "mode": "write", "UUID_01:threadId_01:write": 1 }
此时再次来加写锁,直接到另一个if语句中:
此时redis中数据格式为:
anyLock: { "mode": "write", "UUID_01:threadId_01:write": 2 }
读锁加完后,此时redis数据格式为:
anyLock: { "mode": "read", "UUID_01:threadId_01": 1, "UUID_02:threadId_02": 1 } {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1 {anyLock}:UUID_02:threadId_02:rwlock_timeout:1 1
客户端C参数为:
hget anyLock mode,mode = read,已经有人加了读锁,不是写锁,此时会直接执行:pttl anyLock,返回一个anyLock的剩余生存时间
客户端C加锁失败,就会不断的尝试重试去加锁
加完写锁后此时Redis数据格式为:
anyLock: { "mode": "write", "UUID_01:threadId_01:write": 1 }
客户端B执行读锁逻辑参数为:
接着看下加锁逻辑:
image.png
如上图,客户端B加读锁会走到红框中的if逻辑:
还是接着上面的逻辑,继续分析:
此时redis中数据格式为:
anyLock: { "mode": "write", "UUID_01:threadId_01:write": 1, "UUID_01:threadId_01": 1 } {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
客户端A加读锁后,redis中数据结构为:
anyLock: { "mode": "read", "UUID_01:threadId_01": 1 } {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
此时客户端A再来加写锁,逻辑如下:
image.png
此时客户端A先加的读锁,mode=read,所以再次加写锁是不能成功的
如果是同一个客户端同一个线程,先加了一次写锁,然后加读锁,是可以加成功的,默认是在同一个线程写锁的期间,可以多次加读锁
而同一个客户端同一个线程,先加了一次读锁,是不允许再被加写锁的
显然还有写锁与写锁互斥的逻辑就不分析了,通过上面一些场景的分析,我们可以知道:
例如客户端A先加读锁,然后再次加读锁 最后客户端B来加读锁
此时Redis中数据格式为:
anyLock: { "mode": "read", "UUID_01:threadId_01": 2, "UUID_02:threadId_02": 1 } {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1 {anyLock}:UUID_01:threadId_01:rwlock_timeout:2 1 {anyLock}:UUID_02:threadId_02:rwlock_timeout:1 1
接着我们看下释放锁的核心代码:
public class RedissonReadLock extends RedissonLock implements RLock { @Override protected RFuture<Boolean> unlockInnerAsync(long threadId) { String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " + "if (lockExists == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + "if (counter == 0) then " + "redis.call('hdel', KEYS[1], ARGV[2]); " + "end;" + "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " + "if (redis.call('hlen', KEYS[1]) > 1) then " + "local maxRemainTime = -3; " + "local keys = redis.call('hkeys', KEYS[1]); " + "for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + "if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + "maxRemainTime = math.max(remainTime, maxRemainTime);" + "end; " + "end; " + "end; " + "if maxRemainTime > 0 then " + "redis.call('pexpire', KEYS[1], maxRemainTime); " + "return 0; " + "end;" + "if mode == 'write' then " + "return 0;" + "end; " + "end; " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; ", Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), LockPubSub.unlockMessage, getLockName(threadId)); } }
客户端A来释放锁: 对应的KEYS和ARGV参数为:
接下来开始执行操作:
此时Redis中的数据结构为:
anyLock: { "mode": "read", "UUID_01:threadId_01": 1, "UUID_02:threadId_02": 1 } {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1 {anyLock}:UUID_02:threadId_02:rwlock_timeout:1 1
此时继续往下,具体逻辑如图:
image.png
这个for循环的含义是获取到了所有的timeout key的最大的一个剩余生存时间,假设最大的剩余生存时间是25000毫秒
客户端A继续来释放锁:
此时客户端A执行流程还会和上面一直,执行完成后Redis中数据结构为:
anyLock: { "mode": "read", "UUID_02:threadId_02": 1 } {anyLock}:UUID_02:threadId_02:rwlock_timeout:1 1
因为这里会走counter == 0
的逻辑,所以会执行"redis.call('hdel', KEYS[1], ARGV[2]); "
客户端B继续来释放锁:
客户端B流程也和上面一直,执行完后就会删除anyLock这个key
上面已经分析过这种情形,操作过后Redis中数据结构为:
anyLock: { "mode": "write", "UUID_01:threadId_01:write": 1, "UUID_01:threadId_01": 1 } {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
此时客户端A来释放读锁:
此时Redis中数据变成:
anyLock: { "mode": "write", "UUID_01:threadId_01:write": 1 }
先看下写锁释放的核心逻辑:
public class RedissonWriteLock extends RedissonLock implements RLock { @Override protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (mode == 'write') then " + "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " + "if (lockExists == 0) then " + "return nil;" + "else " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "if (redis.call('hlen', KEYS[1]) == 1) then " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "else " + // has unlocked read-locks "redis.call('hset', KEYS[1], 'mode', 'read'); " + "end; " + "return 1; "+ "end; " + "end; " + "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); } }
客户端A加两次写锁释放:
此时Redis中数据为:
anyLock: { "mode": "write", "UUID_01:threadId_01:write": 2, "UUID_01:threadId_01": 1 } {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
客户端A来释放锁KEYS和ARGV参数:
直接分析lua代码:
此时Redis中数据:
anyLock: { "mode": "write", "UUID_01:threadId_01": 1 } {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
后续还会接着判断,如果count=0,代表写锁都已经释放完了,此时hlen如果>1,代表加的还有读锁,所以接着执行:hset anyLock mode read
, 将写锁转换为读锁
最终Redis数据为:
anyLock: { "mode": "read", "UUID_01:threadId_01": 1 } {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
Redisson陆续也更新了好几篇了,疫情期间宅在家里一直学习Redisson相关内容,这篇文章写了2天,从早到晚。
读写锁这块内容真的很多,本篇篇幅很长,如果学习本篇文章最好跟着源码一起读,后续还会继续更新Redisson相关内容,如有不正确的地方,欢迎指正!
本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句