前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis实现分布式锁

Redis实现分布式锁

作者头像
naget
发布2021-01-08 15:32:51
8051
发布2021-01-08 15:32:51
举报
文章被收录于专栏:VegoutVegout

分布式锁初见

代码语言:javascript
复制
    public static int count = 0;
    private static int expireTime = 50;
    private static String lockKey = "desc";
    private static String lockValue = "lockValue";

    public void decr() {
        if (RedisPoolUtil.setNxPx(lockKey, lockValue, expireTime)) {
            try {
                //业务处理
            } catch (Exception e) {
                log.info("操作error:{}", e.getMessage(), e);
            } finally {
                RedisPoolUtil.del(lockKey);
            }
        }
    }

使用redis分布式锁重要的3个点:①使用redis提供的原子命令,对应jedis给的api是 public String set(String key, String value, String nxxx, String expx, long time) ②设置合适的锁过期时间③ 在finally块释放锁。三点里最难的就是第二点了,里边有一个”合适“,多长时间算是合适呢?跟业务处理时间有很大的关系,一旦设置时间不当,就会出现问题:业务未处理完时,锁过期了怎么办?下面这段代码可以复现这个问题:

代码语言:javascript
复制
 public void decr() {
        if (RedisPoolUtil.setNxPx(lockKey, lockValue, expireTime)) {
            try {
                if (count < 10) {
                    //业务处理...
                    Thread.sleep(10);
                    count++;
                }
                //模拟意外延迟
                if (System.currentTimeMillis() % 2 == 0) {
                    Thread.sleep(15);
                }
            } catch (Exception e) {
                log.info("操作error:{}", e.getMessage(), e);
            } finally {
                RedisPoolUtil.del(lockKey);
            }
        }
    }
     public void result() {
        log.info("拿到锁进行操作:{}",count);
    }

使用多线程模拟并发,调用该方法,最终输出count的值

代码语言:javascript
复制
 [main] INFO blog20201215.MyLock - 拿到锁进行操作:11

可以看到有count本应该最大增到10,但是因为锁过期,同时有不止一个线程进入了业务处理代码块,count增到了11,分布式锁失效。该情况会导致两个问题,一个是先进入代码块的线程A未完成业务操作,同时后来线程B进入,造成错误。二是后来的线程B未完成操作之前,先进入代码块的线程A会释放掉后进入的线程设置的分布式锁,导致恶性循环。

锁过期

所以我们可以将过期时间设置的长一点,防止过期。但是多长算长呢?很难把控,于是我们可以在拿到分布式锁之后,开始业务操作之前,启动一个线程给拿到锁的该线程”续命“,也就是只要该线程未完成业务处理,子线程就去增加分布式锁的过期时间。demo如下:

代码语言:javascript
复制
    public void decr1() {
        if (RedisPoolUtil.setNxPx(lockKey, "1", expireTime)) {
            Thread t = null;
            try {
             //开启续命线程
                t = daemon(Thread.currentThread());
                if (count < 10) {
                    //业务处理...
                    Thread.sleep(10);
                    count++;
                }
                //模拟意外延迟
                if (System.currentTimeMillis() % 2 == 0) {
                    Thread.sleep(15);
                }
            } catch (Exception e) {
                log.info("操作error:{}", e.getMessage(), e);
            } finally {
                if (Objects.nonNull(t)){
                //关续命线程
                    t.interrupt();
                }
                RedisPoolUtil.del(lockKey);
            }
        }
    }
     private Thread daemon(Thread parent){
        Thread t =  new Thread(new Runnable() {

            @Override
            public void run() {
                boolean end = false;
                do {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        log.info("{}终止了自己的辅助线程",parent.getName());
                        end = true;
                    }
                    if (!end){
                        log.info("给{}续命",parent.getName());
                        RedisPoolUtil.setEx(lockKey,parent.getName(),1);
                    }
                }while (parent.isAlive() && !end);
            }

        });
        t.start();
        return t;
    }

有了续命线程,锁不会提前过期,这样也就不会有刚才提到的第二个问题了(A线程释放了B线程的分布式锁)。但是这样的操作添加了系统的复杂性,也消耗了更多的资源。如果不使用续命线程,第二个问题该怎么解决呢?

解锁只能上锁人

A线程释放了B线程的锁,下面这个demo进行了复现:

代码语言:javascript
复制
    public void decrV2() {
        if (RedisPoolUtil.setNxPx(lockKey, Thread.currentThread().getName(), expireTime)) {
            try {
                if (count < 10) {
                    //业务处理...
                    Thread.sleep(10);
                    count++;
                }
                //模拟意外延迟
                if (System.currentTimeMillis() % 2 == 0) {
                    Thread.sleep(15);
                }
            } catch (Exception e) {
                log.info("操作error:{}", e.getMessage(), e);
            } finally {
                String value = RedisPoolUtil.get(lockKey);
                if (!Thread.currentThread().getName().equals(value)) {
                    log.info("线程{}释放了{}的锁",Thread.currentThread().getName(),value);
                }
                RedisPoolUtil.del(lockKey);
            }
        }
    }
    

模拟并发调用,可以看到部分输出如下:

代码语言:javascript
复制
[pool-1-thread-17] INFO blog20201215.MyLock - 线程pool-1-thread-17释放了pool-1-thread-59的锁
[pool-1-thread-59] INFO blog20201215.MyLock - 线程pool-1-thread-59释放了pool-1-thread-81的锁
[pool-1-thread-20] INFO blog20201215.MyLock - 线程pool-1-thread-20释放了pool-1-thread-12的锁
[pool-1-thread-58] INFO blog20201215.MyLock - 线程pool-1-thread-58释放了pool-1-thread-12的锁
[pool-1-thread-40] INFO blog20201215.MyLock - 线程pool-1-thread-40释放了pool-1-thread-86的锁
[pool-1-thread-50] INFO blog20201215.MyLock - 线程pool-1-thread-50释放了pool-1-thread-68的锁
[pool-1-thread-81] INFO blog20201215.MyLock - 线程pool-1-thread-81释放了pool-1-thread-68的锁
[pool-1-thread-85] INFO blog20201215.MyLock - 线程pool-1-thread-85释放了pool-1-thread-32的锁
[pool-1-thread-72] INFO blog20201215.MyLock - 线程pool-1-thread-72释放了pool-1-thread-32的锁
[pool-1-thread-64] INFO blog20201215.MyLock - 线程pool-1-thread-64释放了pool-1-thread-32的锁
[pool-1-thread-86] INFO blog20201215.MyLock - 线程pool-1-thread-86释放了pool-1-thread-32的锁
[pool-1-thread-87] INFO blog20201215.MyLock - 线程pool-1-thread-87释放了pool-1-thread-17的锁
[main] INFO blog20201215.MyLock - 拿到锁进行操作:12
[pool-1-thread-73] INFO blog20201215.MyLock - 线程pool-1-thread-73释放了pool-1-thread-52的锁
[pool-1-thread-95] INFO blog20201215.MyLock - 线程pool-1-thread-95释放了pool-1-thread-52的锁

解决这个问题,可以规定:解锁只能上锁人。A线程上的锁,只能A线程来释放。只要我们将lockKey的value设置成与该线程相关的值就可以了,在释放锁时进行判断,不是自己的锁,就不释放。这样可以保证不会产生恶心循环。

代码语言:javascript
复制
    public void decrV3() {
        if (RedisPoolUtil.setNxPx(lockKey, Thread.currentThread().getName(), expireTime)) {
            try {
                if (count < 10) {
                    //业务处理...
                    Thread.sleep(10);
                    count++;
                }
                //模拟意外延迟
                if (System.currentTimeMillis() % 2 == 0) {
                    Thread.sleep(15);
                }
            } catch (Exception e) {
                log.info("操作error:{}", e.getMessage(), e);
            } finally {
                String value = RedisPoolUtil.get(lockKey);
                if (!Thread.currentThread().getName().equals(value)) {
                    log.info("线程{}不可释放{}的锁",Thread.currentThread().getName(),value);
                }else {
                    RedisPoolUtil.del(lockKey);
                }
            }
        }
    }

以上代码中释放锁的代码还是有些问题,因为其中涉及到了两个redis操作①获取当前锁的value ②比较之后,再执行del操作。可能会发生一种情况,当执行完第一步之后,第二步还没来得及执行,锁过期了,其他线程设置了自己的锁,于是再次出现线程A释放线程B的情况。我们可以使用Lua脚本将这两个操作合为一个原子操作,如下:

代码语言:javascript
复制
public static boolean releaseDistributedLock(String lockKey, String value) {
        Jedis jedis = null;
        Object result;
        try{
            jedis = RedisPool.getJedis();
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(value));
        }catch (Exception e) {
            log.error("releaseDistributedLock error key:{},value:{}", lockKey, value, e);
            RedisPool.returnBrokenResource(jedis);
            return false;
        }
        RedisPool.returnResource(jedis);
        if ("OK".equals(result)) {
            return true;
        }
        return false;
    }
    
    

解锁时调用以上这个方法就可以了

代码语言:javascript
复制
    public void decrV4() {
        if (RedisPoolUtil.setNxPx(lockKey, Thread.currentThread().getName(), expireTime)) {
            try {
                if (count < 10) {
                    //业务处理...
                    Thread.sleep(10);
                    count++;
                }
                //模拟意外延迟
                if (System.currentTimeMillis() % 2 == 0) {
                    Thread.sleep(15);
                }
            } catch (Exception e) {
                log.info("操作error:{}", e.getMessage(), e);
            } finally {
                if (!RedisPoolUtil.releaseDistributedLock(lockKey,Thread.currentThread().getName())) {
                    log.info("线程{}不可释放{}的锁",Thread.currentThread().getName(), RedisPoolUtil.get(lockKey));
                }
            }
        }
    }

再次模拟并发调用,输出如下:

代码语言:javascript
复制
[pool-1-thread-24] INFO blog20201215.MyLock - 线程pool-1-thread-24不可释放pool-1-thread-42的锁
[pool-1-thread-42] INFO blog20201215.MyLock - 线程pool-1-thread-42不可释放pool-1-thread-96的锁
[pool-1-thread-27] INFO blog20201215.MyLock - 线程pool-1-thread-27不可释放pool-1-thread-96的锁
[pool-1-thread-96] INFO blog20201215.MyLock - 线程pool-1-thread-96不可释放pool-1-thread-77的锁
[pool-1-thread-77] INFO blog20201215.MyLock - 线程pool-1-thread-77不可释放pool-1-thread-100的锁
[pool-1-thread-46] INFO blog20201215.MyLock - 线程pool-1-thread-46不可释放pool-1-thread-17的锁
[pool-1-thread-100] INFO blog20201215.MyLock - 线程pool-1-thread-100不可释放pool-1-thread-88的锁
[pool-1-thread-17] INFO blog20201215.MyLock - 线程pool-1-thread-17不可释放pool-1-thread-79的锁
[pool-1-thread-79] INFO blog20201215.MyLock - 线程pool-1-thread-79不可释放pool-1-thread-15的锁
[pool-1-thread-88] INFO blog20201215.MyLock - 线程pool-1-thread-88不可释放pool-1-thread-15的锁
[main] INFO blog20201215.MyLock - 拿到锁进行操作:10
[pool-1-thread-59] INFO blog20201215.MyLock - 线程pool-1-thread-59不可释放pool-1-thread-87的锁
[pool-1-thread-87] INFO blog20201215.MyLock - 线程pool-1-thread-87不可释放null的锁

可以看到,没出现什么问题,但是锁提前释放的问题任然存在。(当然分布式锁应用于分布式环境,目前我是单机测试,所以value采用了线程名称,更严谨一些应该生成唯一id,作为锁的value)

可重入锁

一些业务场景可能会需要分布式锁具有可重入性,可以使用通过ThreadLocal记录当前线程获取到的锁,记录获取次数,获取锁时先检查当前线程是否已经获取到了锁,如果没有,则通过redis去获取锁,拿到之后在本线程记录已获取到锁,次数为1;第二次获取锁时,如果发现已经获取到了锁,直接给获取次数加1就可以,同理在释放锁时也需要将次数标识进行递减,从而实现一个可重入锁。

redis挂了怎么办

生产环境,redis一般都是一个集群,有多台机器,当记录我们加锁的那台redis挂了怎么办?这种情况会导致,其他节点加锁成功,分布式锁失败。我想到两种方案:①redis集群采取主从架构,比如三主三从,当主节点挂了,从节点可以顶上,从而保证业务正常,但也有可能从节点没有将主节点的锁信息同步过去,这种情况只能采取第二种方法了。②加锁是采取RedLock的思路,加锁时,给集群中(n/2+1)个节点加锁成功才算获取锁成功,解锁时给redis集群中的所有机器解锁。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-01-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Vegout 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分布式锁初见
  • 锁过期
  • 解锁只能上锁人
  • 可重入锁
  • redis挂了怎么办
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档