前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >redisson分布式锁实现原理_redisson连接池

redisson分布式锁实现原理_redisson连接池

作者头像
全栈程序员站长
发布2022-11-03 17:11:43
4140
发布2022-11-03 17:11:43
举报

近期在处理程序有两个不同来源入口的时候,因为容易产生并发情况,造成会有脏数据产生,在同事推荐下使用redisson的锁来解决并发问题。 先上使用的一定程度封装的工具类:

工具类

代码语言:javascript
复制
@Service
public class RedissonManager { 

@Autowired
private RedissonClient redissonClient;
/** * 加锁 * * @param lockKey * @return */
public RLock lock(String lockKey) { 

RLock lock = redissonClient.getLock(lockKey);
lock.lock();
return lock;
}
/** * 释放锁 * * @param lockKey */
public void unlock(String lockKey) { 

RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
}
/** * 释放锁 * * @param lock */
public void unlock(RLock lock) { 

lock.unlock();
}
/** * 带超时的锁 * * @param lockKey * @param timeout 超时时间 单位:秒 */
public RLock lock(String lockKey, int timeout) { 

RLock lock = redissonClient.getLock(lockKey);
lock.lock(timeout, TimeUnit.SECONDS);
return lock;
}
/** * 带超时的锁 * * @param lockKey * @param unit 时间单位 * @param timeout 超时时间 */
public RLock lock(String lockKey, TimeUnit unit, int timeout) { 

RLock lock = redissonClient.getLock(lockKey);
lock.lock(timeout, unit);
return lock;
}
/** * 尝试获取锁 * * @param lockKey * @param waitTime 最多等待时间 * @param leaseTime 上锁后自动释放锁时间 * @return */
public boolean tryLock(String lockKey, int waitTime, int leaseTime) { 

RLock lock = redissonClient.getLock(lockKey);
try { 

return lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
} catch (InterruptedException e) { 

return false;
}
}
/** * 尝试获取锁 * * @param lockKey * @param unit 时间单位 * @param waitTime 最多等待时间 * @param leaseTime 上锁后自动释放锁时间 * @return */
public boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) { 

RLock lock = redissonClient.getLock(lockKey);
try { 

return lock.tryLock(waitTime, leaseTime, unit);
} catch (InterruptedException e) { 

return false;
}
}
}

实际使用很简单,就是直接使用方法来锁住一个key,但是后续测试发现lock和tryLock是两种不同的情况。 lock是当获取锁失败时会阻塞当前进程,如果没有带参数设置过期时间则是30秒后自动解锁。 tryLock则是当获取锁失败时,当超过设置的等待时间时返回false

后面楼主出于好奇便看了一下redisson源码以及结合网上大神的见解,略为理解了一下,以此记录一下个人见解(不对请大家积极指出

lock

代码语言:javascript
复制
	private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { 

// 此处为获取当前线程id
long threadId = Thread.currentThread().getId();
// 核心代码见下图后继续回来走逻辑
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// 此处得到获取锁的结果,正常获取锁则ttl为null,竞争锁时返回锁的过期时间 
if (ttl == null) { 

return;
}
// 此处为订阅锁释放事件,
// 如果当前线程通过 Redis 的 channel 订阅锁的释放事件获取得知已经被释放
// 则会发消息通知待等待的线程进行竞争.
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) { 

commandExecutor.syncSubscriptionInterrupted(future);
} else { 

commandExecutor.syncSubscription(future);
}
try { 

while (true) { 

// 此处循环重试获取锁,直至重新获取锁成功才跳出循环,
// 此种做法阻塞进程,一直处于等待锁手动释放或者超时才继续线程
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) { 

break;
}
// waiting for message
if (ttl >= 0) { 

try { 

future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { 

if (interruptibly) { 

throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else { 

if (interruptibly) { 

future.getNow().getLatch().acquire();
} else { 

future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally { 

// 最后释放订阅事件
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}

tryLockInnerAsync

代码语言:javascript
复制
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { 

return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

此段脚本为一段lua脚本: 结合个人理解其中的变量参数: KEY[1]: 为你加锁的lock值 ARGV[2]: 为线程id ARGV[1]: 为设置的过期时间

第一个if: 判断是否存在设置lock的key是否存在,不存在则利用redis的hash结构设置一个hash,值为1,并设置过期时间,后续返回锁。 第二个if: 判断是否存在设置lock的key是否存在,存在此线程的hash,则为这个锁的重入次数加1(将hash值+1),并重新设置过期时间,后续返回锁。 最后返回: 这个最后返回不是说最后结果返回,是代表以上两个if都没有进入,则代表处于竞争锁的情况,后续返回竞争锁的过期时间。

tryLock

trylock具有返回值,true或者false,表示是否成功获取锁。tryLock前期获取锁逻辑基本与lock一致,主要是后续获取锁失败的处理逻辑与lock不一致。

代码语言:javascript
复制
	 @Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { 

long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) { 

return true;
}
// 以上与lock逻辑一致
// 获取锁失败后,中途tryLock会一直判断中间操作耗时是否已经消耗锁的过期时间,如果消耗完则返回false
time -= System.currentTimeMillis() - current;
if (time <= 0) { 

acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
// 此处为订阅锁释放事件,
// 如果当前线程通过 Redis 的 channel 订阅锁的释放事件获取得知已经被释放
// 则会发消息通知待等待的线程进行竞争.
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 将订阅阻塞,阻塞时间设置为我们调用tryLock设置的最大等待时间,超过时间则返回false
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { 

if (!subscribeFuture.cancel(false)) { 

subscribeFuture.onComplete((res, e) -> { 

if (e == null) { 

unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try { 

time -= System.currentTimeMillis() - current;
if (time <= 0) { 

acquireFailed(waitTime, unit, threadId);
return false;
}
// 循环获取锁,但由于上面有最大等待时间限制,基本会在上面返回false
while (true) { 

long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) { 

return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) { 

acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) { 

subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else { 

subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) { 

acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally { 

unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}

结论

尽量在自己代码逻辑中添加解锁的逻辑,避免锁长时间存在浪费不必要的资源

综上所述,应尽量使用tryLock,且携带参数,因为可设置最大等待时间以及可及时获取加锁返回值,后续可做一些其他加锁失败的业务

以上是个人理解,如有不对,希望各位大神指出修改

在这里插入图片描述
在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/181595.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022年10月15日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 工具类
  • lock
  • tryLock
  • 结论
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档