前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一篇吃透Redis与Redisson分布式锁

一篇吃透Redis与Redisson分布式锁

原创
作者头像
Joseph_青椒
发布2023-08-06 11:58:18
4070
发布2023-08-06 11:58:18
举报
文章被收录于专栏:java_josephjava_joseph

这篇文章带大家吃透分布式锁

这次来点高级的东西,基础 不行的慎入奥,带大家吃透分布式锁那些事儿!

分布式锁由来

这里有必要点一下,来就来的透彻点,

比如一个库存扣减操作,redis扣减,jvm单机下,加synchronized是不会出现问题的,排队执行,

但是分布式下,即使jvm进程加了这样的重量级锁,还是会有问题,毕竟多个结点操作一个redis库存扣减,jvm进程无法

影响到其他的进程,这就有了分布式锁,

这里讲最常用的,redis中的setnx命令,(设置如果不存在---也就是意味着不存在才有能力加锁,这样的话就可以只有一个jvm

进程去拿到这把锁,然后执行扣减,这就是分布式锁。

redis原生分布式锁

注意这里不带从0-1各个问题的排除了,看博客即可,我在这只做部分巩固

这里带大家看一个分布式锁防止超领的例子,原生分布式锁实现

代码语言:javascript
复制
​
/**
* 原生分布式锁 开始
* 1、原子加锁 设置过期时间,防止宕机死锁
* 2、原子解锁:需要判断是不是自己的锁
*/
String uuid = CommonUtil.generateUUID();
String lockKey = "lock:coupon:"+couponId;
Boolean nativeLock=redisTemplate.opsForValue().setIfAbsent(lockKey,uuid,Duration.ofSeconds(30));
    if(nativeLock){
      //加锁成功
      log.info("加锁:{}",nativeLock);
      try {
           //执行业务  TODO
        }finally {
           String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
​
                Integer result = redisTemplate.execute(new DefaultRedisScript<>(script, Integer.class), Arrays.asList(lockKey), uuid);
                log.info("解锁:{}",result);
            }
​
        }else {
            //加锁失败,睡眠100毫秒,自旋重试
            try {
                TimeUnit.MILLISECONDS.sleep(100L);
            } catch (InterruptedException e) { }
            return addCoupon( couponId, couponCategory);
        }
        //原生分布式锁 结束

这里点几个点要注意下,setnx和expire不是原子性的,但是redisTemplate提供的setIfAbsent的重载方法可以传参过期时间!

这样就保证了原子性,(注意这里的原子性和数据库的原子性不一样,指的是程序的不可再分)

好,那么,误删问题是什么?大家一定听过, 这里就是finally中lua脚本的东西,这里lua脚本是保证判断和删除原子性的,

下面说。

那么这里着重强调下,删除这里的uuid有啥用,

这里的uuid其实就是为误删问题做准备的,误删,说白了就是任务1执行时间长,expire时间到了,但是任务还在执行,锁已经没了

任务2是可以拿到锁的,但是任务1一执行完,哎,给删了,把任务2的给删了,其他任务又能拿到锁了,这就是误删问题,

误删问题的解决 就是靠这里的uuid,加锁的时候,value设置uuid,当前线程获取uuid,每个任务删除的时候必须和这个jvm实例的uuid匹配,才能删除,不能删除别人的。

好,这里说下lua脚本的作用,保证判断和删除的原子性,为何要保证原子性

加入expire10s,执行到判断,是个9.99s 这时候还未删除,但是key过期了,并发量很大,别的线程在这时间一拿到锁,但是判断条件已经过去了,直接删除,就误删了,和之前一样,误删问题,所以采用lua脚本解决误删问题

但是锁过期是无法解决的,锁过期之后,别的线程就可以拿到了,就需要一种机制叫锁续命

锁续命

如何给锁续命?这里就是在拿到锁之后,添加个延迟任务,假设30s的expire时间,延迟任务要小于30,比如20

20秒一到,就需要从新设置expire时间,当然现在已经做到很好的开源实现了,redisson,类似于jedis

我们做好对redisson的使用,和理顺他的内部逻辑就好了,玩好他的架构思想多么牛掰。

now,先看怎么使用的,

代码语言:javascript
复制
String lockKey = "lock:coupon:"+couponId+":"+loginUser.getId();
RLock rLock = redissonClient.getLock((lockKey));
//多个线程进入,会阻塞等待释放锁
rLock.lock();
log.info("领卷接口加锁成功:{}",Thread.currentThread().getId());
try{
    //业务逻辑
}finally {
    rLock.unlock();
    log.info("解锁成功");
}

源码剖析:redisson流程

image-20230614232603460
image-20230614232603460

记住这个图,设计的很巧妙。先带大家主要过一遍,我们举两个线程为例,都去Redis服务端尝试去获取锁,一个线程拿到锁,另一个线程尝试加锁需要自旋去获取这把锁,;加锁这里有watch dog机制,能给锁续命,自旋这里会有阻塞、唤醒机制保证高性能,

现在从这两点带大家剖析下源码。

watch dog:

弄清watch dog机制,首先看如何加锁的,

先不进行健壮性考虑,直接进入核心逻辑

image-20230615155714020
image-20230615155714020
image-20230615155824133
image-20230615155824133

这块lua脚本是这样子的,上面试加锁逻辑,下面是实现可重入

lua脚本语法就不讲了,过于基础,看下面三个参数,

KEYS【1】<=>getName()就是获取key,这个name就是加锁的时候,自定义的锁的名字,通过这可以觉得锁的粒度,比如

是优惠劵级别还是到用户级别 String lockKey = "lock:coupon:"+couponId+":"+loginUser.getId();

ARg[2]<=>getLockName()这里就是hset的设置的value中的key,这里传入threadId,其实执行下去是一个uuid+threadId

这个参数2其实就相当于原生分布式锁的uuid,咱刚才用来防止误删使用的

ARG[1]<=>internallLockLeaseTime,就是超时时间,对应的值是lockWatchdogTimeout,看门狗超时时间,默认是30s

先分析上面这块代码,

一开始,获取key,肯定是没有的,这个锁没有呢还,然后用hset去加锁,至于这个1,一会儿讲

然后就是配置过期时间了,配置的是参数1的30s

下半拉代码:

这里就比较简单了,实际上就是利用hash结构去实现可重入锁,

如果已经加了锁,还想去拿这把锁,判断hash结构的参数2存在不存在,是的话自增,再续期,这里是可以无限可重入,来就续期

注意这里return 了一个pttl,过期时间,这里的return记住,会用的

到这里,加锁就结束了。

激动人心的时刻到了,就是咱刚刚原生分布式锁,缺少的续命机制,watch dog!

我们再看tryAcquireAsync方法,这个方法最好记住

image-20230615171022716
image-20230615171022716

tryAcquireAsnc,加锁的逻辑返回的是Future这是异步去加锁的,

异步执行完会去回调用addListener方法

直接讲核心的,就是这个future到ttl(就是刚才lua脚本的return出来的),ttl就是剩余时间

注意加锁成功的话,是直接返回null的,这个ttl是其他线程自旋使用的,防止不断自旋占用cpu,下面会讲

我们这里先剖析加锁和续命逻辑

如果ttl为null,加锁成功,就可以续命了!

看代码中的,scheduleExpirationRenewal(threadId),时间延期刷新,这个方法就是锁续命的核心逻辑了

好,我们进去看一下

代码语言:javascript
复制
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) throws Exception {
        
        RFuture<Boolean> future = renewExpirationAsync(threadId);
        
        future.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                expirationRenewalMap.remove(getEntryName());
                if (!future.isSuccess()) {
                    log.error("Can't update lock " + getName() + " expiration", future.cause());
                    return;
                }
                
                if (future.getNow()) {
                    // reschedule itself
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
    }
​
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

这里是个延迟任务,延迟多久,看最下面,30/3=10 ,10s后去执行run方法

好看下renewExpirationAsync(threadId);的逻辑

代码语言:javascript
复制
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()), 
        internalLockLeaseTime, getLockName(threadId));
}

这里就是和之前一样了,通过uuid+threadId标识判断这个线程结束了没,==1的时候,主线程未结束,锁还没是否的,

就expire重新设置时间。但是定时任务的续命怎么做的???就是万一线程还在执行好久好久,时间还不够,

注意这段lua脚本返回的1,

image-20230615174206889
image-20230615174206889

这里续命成功之后,又去调用刷新方法了!!!递归嘛,又是等10s ,好,这种方法嵌套调用是中间件里很常用的一种定时任务实现。!

这种方法嵌套实现定时任务,很重要,很牛掰,很快,中间件源码很常见。方法嵌套实现定时任务!重要的事情说n遍

好,这里简洁的回顾下锁续命的逻辑:

首先,加锁的时候是异步去加的,加锁成功,直接teturn null,ttl==null的话,会异步进行续命,这个续命刷新方法是个TimerTask异步延迟任务,延迟expire的三分之一时间,

去判断锁是否释放,没释放的话就会续期,然后续期成功之后还会嵌套调用实现定时任务。

自旋加锁

我们原生分布式锁,加不到锁是会直接放弃的!!那么这样对用户的体验太不好了

好,我们记得,当时加锁逻辑是不是提到一个ttl,这个ttl就是剩余锁寿命,其他线程加锁失败,就会把剩余的ttl返回出去

image-20230615181025158
image-20230615181025158

我们去tryAcquireAsync上一级,看到返回的ttl,我们知道ttl是加锁失败才会返回的,去处理加锁失败的事情,

好,从代码中看到这个ttl==null直接return出去,啥也没干,往下翻,

image-20230615181225955
image-20230615181225955

看这里的while循环,就是自旋加锁,但是这个自旋并不是一直加的,根据就是根据ttl,

这里根据并发编程的东西进行等待,这里过于基础,不去讲了,知道等待ttl的时间,比如30s,执行了5s,返回25s,

第二个线程就要等待25s,

but,我们知道,你阻塞,一定要有配套的唤醒的,你阻塞人家25s,下一秒线程执行完了,白等24s?这样实现的也就太鸡肋了

所以这里阻塞和唤醒要同时实现的思想要牢记在心

那好,知道要有唤醒的设施了,redis怎么去实现的?

publish subscribe 发布订阅模型,这里理解成一个队列也可,在redis中叫做信道channel

这里有必要 点一下,原生分布式锁判断这个线程的唯一标识是用过uuid,redisson中靠的是uuid+threadId

还是看tryAcquire中的逻辑

image-20230615195856620
image-20230615195856620

这里加锁逻辑中式订阅了一个信道的,订阅信道目的是唤醒线程停止阻塞

那么好,如果是你,什么时候去发布一个消息到信道?当然是解锁的时候,解锁的时候发布,然后被其他线程加锁逻辑接受,立马去

获取锁,点一下,从这里可以看出,这里是非公平锁,谁抢到算谁的,不分先来后到

解锁逻辑:

image-20230615202824556
image-20230615202824556
代码语言:javascript
复制
"if (redis.call('exists', KEYS[1]) == 0) then " +
    "redis.call('publish', KEYS[2], ARGV[1]); " +
    "return 1; " +
"end;" +
判断锁在不在,不在的话,说明执行完了,发布解锁消息
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " 
 锁在,要解锁,但是要判断是不是他的锁,参数3是uuid+threadId,这里也是我们原生分布式锁中finally中判断逻辑,防止误删
 (当然误删场景是在锁过期之后发生的,看门狗机制这里应该不会发生,但是不是只有看门狗机制的,有直接配置过期时间,不用看门狗机制
 下面会讲)
 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
 "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
    //counter是什么。还记得加锁时候的hset吗,hash结构也有key value,key是uuid+threadId,vlaue我们设置为1了
    但是上面还有可重入锁的设计,1自增为2,那么解锁的时候,也要自增-1,hincry -1,等于0的时候,才算锁用完了,
    不等于0,直接return什么也不做,等待其他线程解锁,知道hash的value==0.才执行publish,发布消息到信道 

好,解锁发消息到信道已经打通了,那么阻塞的线程,他们怎么知道去哪消费

唤醒逻辑:

解锁逻辑中的unlockMessage记得吗,值是0,这发消息相当于一个标识而已,publish就是发了一个标识,类似消息MQ的一对多订阅

一堆阻塞的线程去监听一个队列,在redisson中,就是一堆线程等带解锁publish发布消息,然后去回调onMessage去解锁

image-20230615205925553
image-20230615205925553

还记得怎么阻塞的吗?就是通过并发编程的信号量,这个是加锁逻辑ttl存在时候做的,这里让这些订阅信道的这堆线程去抢

然后释放,就做到了唤醒!,就可以去拿了,这些阻塞的线程不分先后,所以是非公平自旋锁。

注意点:

redisson使用锁续命

代码语言:javascript
复制
String lockKey = "lock:coupon:"+couponId+":"+loginUser.getId();
RLock rLock = redissonClient.getLock((lockKey));
//多个线程进入,会阻塞等待释放锁
rLock.lock();

我们看这个代码,lock上面的注释,写的是会阻塞等待释放锁,现在好理解了吧?

so easy!,but,传入过期时间

image-20230615210929082
image-20230615210929082

这里是不会使用看门狗机制给锁续命的,也就是说,你默认30s,人家帮你续命,你修改,就不管了

当然你也可以自定义redisson配置纳入ioc,修改默认过期时间

还有一个点

代码语言:javascript
复制
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)

这里tryLock,看到waitTime没,最多等待这么多秒,也就是说不会去自旋,同样leaseTIme,也不会去帮你续命。

加餐福利:可重入锁生产通用模型

我们加锁逻辑提到了可重入锁,那么可重入锁在生产中咋用?我这里

代码语言:javascript
复制
//key1是短链码,ARGV[1]是accountNo,ARGV[2]是过期时间
String script = "if redis.call('EXISTS',KEYS[1])==0 then redis.call('set',KEYS[1],ARGV[1]); redis.call('expire',KEYS[1],ARGV[2]); return 1;" +
                " elseif redis.call('get',KEYS[1]) == ARGV[1] then return 2;" +
                " else return 0; end;";
​
Long result = redisTemplate.execute(new
                DefaultRedisScript<>(script, Long.class), Arrays.asList(code), accountNO,100);

上面我们看到hset实现的可重入锁,那么生产中,想要实现分布式可重入锁也用hset吗,有没有简单些的,

hest中,是可以无限自增的,但是生产应用中,要结合场景定义,上面例子

只适合特定的场景,这个场景是对于code作为key下,对某个用户可实现可重入

就是code是一个业务,防止code并发冲突,把code作为锁粒度key,但是对于某个用户,accountNO一样,我是随便操作的。无限的可重入。

那么可想而知,这里的可重入锁不涉及续期,自旋的,适用于特殊场景,特定场景 特定实现,这是分析给大家的一个通用模型。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分布式锁由来
  • redis原生分布式锁
  • 锁续命
    • 源码剖析:redisson流程
      • watch dog:
        • 自旋加锁
          • 注意点:
          • 加餐福利:可重入锁生产通用模型
          相关产品与服务
          云数据库 Redis
          腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档