前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【分布式锁】01-使用Redisson实现可重入分布式锁原理

【分布式锁】01-使用Redisson实现可重入分布式锁原理

作者头像
一枝花算不算浪漫
发布2020-03-20 11:27:12
2.8K0
发布2020-03-20 11:27:12
举报

前言

主流的分布式锁一般有三种实现方式:

  1. 数据库乐观锁
  2. 基于Redis的分布式锁
  3. 基于ZooKeeper的分布式锁

之前我在博客上写过关于mysql和redis实现分布式锁的具体方案:

https://cloud.tencent.com/developer/article/1393488

里面主要是从实现原理出发。

这次【分布式锁】系列文章主要是深入redis客户端reddision源码和zk 这两种分布式锁的实现原理。

可靠性

首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  1. 互斥性。在任意时刻,只有一个客户端能持有锁。
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  3. 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
  4. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

Redisson加锁原理

redisson是一个非常强大的开源的redis客户端框架, 官方地址:

https://redisson.org/

使用起来很简单,配置好maven和连接信息,这里直接看代码实现:

代码语言:javascript
复制
1RLock lock = redisson.getLock("anyLock");
2
3lock.lock();
4lock.unlock();

redisson具体的执行加锁逻辑都是通过lua脚本来完成的,lua脚本能够保证原子性。

先看下RLock初始化的代码:

代码语言:javascript
复制
 1public class Redisson implements RedissonClient {
 2
 3    @Override
 4    public RLock getLock(String name) {
 5        return new RedissonLock(connectionManager.getCommandExecutor(), name);
 6    }
 7}
 8
 9public class RedissonLock extends RedissonExpirable implements RLock {
10    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
11    super(commandExecutor, name);
12    this.commandExecutor = commandExecutor;
13    this.id = commandExecutor.getConnectionManager().getId();
14    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
15    this.entryName = id + ":" + name;
16}

首先看下RedissonLock 的id返回的是一个UUID对象,每个机器都对应一个自己的id属性,id 值就类似于:"8743c9c0-0795-4907-87fd-6c719a6b4586"

接着往后看lock()的代码实现:

代码语言:javascript
复制
 1public class RedissonLock extends RedissonExpirable implements RLock {
 2    @Override
 3    public void lock() {
 4        try {
 5            lockInterruptibly();
 6        } catch (InterruptedException e) {
 7            Thread.currentThread().interrupt();
 8        }
 9    }
10
11    @Override
12    public void lockInterruptibly() throws InterruptedException {
13        lockInterruptibly(-1, null);
14    }
15
16    @Override
17    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
18        // 获取当前线程id
19        long threadId = Thread.currentThread().getId();
20        Long ttl = tryAcquire(leaseTime, unit, threadId);
21        // lock acquired
22        if (ttl == null) {
23            return;
24        }
25
26        RFuture<RedissonLockEntry> future = subscribe(threadId);
27        commandExecutor.syncSubscription(future);
28
29        try {
30            while (true) {
31                ttl = tryAcquire(leaseTime, unit, threadId);
32                // lock acquired
33                if (ttl == null) {
34                    break;
35                }
36
37                // waiting for message
38                if (ttl >= 0) {
39                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
40                } else {
41                    getEntry(threadId).getLatch().acquire();
42                }
43            }
44        } finally {
45            unsubscribe(future, threadId);
46        }
47    }
48
49    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
50        internalLockLeaseTime = unit.toMillis(leaseTime);
51
52        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
53                  "if (redis.call('exists', KEYS[1]) == 0) then " +
54                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
55                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
56                      "return nil; " +
57                  "end; " +
58                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
59                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
60                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
61                      "return nil; " +
62                  "end; " +
63                  "return redis.call('pttl', KEYS[1]);",
64                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
65    }
66}

这里省略了一些中间代码,这里主要看tryAcquire() 方法,这里传递的过期时间为-1,然后就是当前的线程id,接着就是核心的lua脚本执行流程,我们来一步步看看是如何执行的:

代码语言:javascript
复制
1"if (redis.call('exists', KEYS[1]) == 0) then " +
2  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
3  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
4  "return nil; " +
5"end; " +

KEYS[1] 参数是:“anyLock”

ARGV[2] 是:“id + ":" + threadId”

首先用的exists 判断redis中是否存在当前key,如果不存在就等于0,然后执行hset指令,将“anyLock id:threadId 1”存储到redis中,最终redis存储的数据类似于:

代码语言:javascript
复制
1{
2  "8743c9c0-0795-4907-87fd-6c719a6b4586:1":1
3}

偷偷说一句,最后面的一个1 是为了后面可重入做的计数统计,后面会有讲解到。

接着往下看,然后使用pexpire设置过期时间,默认使用internalLockLeaseTime为30s。最后返回为null,即时加锁成功。

Redisson 可重入原理

我们看下锁key存在的情况下,同一个机器同一个线程如何加锁的?

代码语言:javascript
复制
1"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
2  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
3  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
4  "return nil; " +
5"end; " +
6"return redis.call('pttl', KEYS[1]);",

ARGV[2] 是:“id + ":" + threadId”

如果同一个机器同一个线程再次来请求,这里就会是1,然后执行hincrby, hset设置的value+1 变成了2,然后继续设置过期时间。

同理,一个线程重入后,解锁时value - 1

Redisson watchDog原理

如果一个场景:现在有A,B在执行业务,A加了分布式锁,但是生产环境是各种变化的,如果万一A锁超时了,但是A的业务还在跑。而这时由于A锁超时释放,B拿到锁,B执行业务逻辑。这样分布式锁就失去了意义?

所以Redisson 引入了watch dog的概念,当A获取到锁执行后,如果锁没过期,有个后台线程会自动延长锁的过期时间,防止因为业务没有执行完而锁过期的情况。

我们接着来看看具体实现:

代码语言:javascript
复制
 1private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
 2    if (leaseTime != -1) {
 3        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
 4    }
 5    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
 6    ttlRemainingFuture.addListener(new FutureListener<Long>() {
 7        @Override
 8        public void operationComplete(Future<Long> future) throws Exception {
 9            if (!future.isSuccess()) {
10                return;
11            }
12
13            Long ttlRemaining = future.getNow();
14            // lock acquired
15            if (ttlRemaining == null) {
16                scheduleExpirationRenewal(threadId);
17            }
18        }
19    });
20    return ttlRemainingFuture;
21}

当我们tryLockInnerAsync执行完之后,会添加一个监听器,看看监听器中的具体实现:

代码语言:javascript
复制
 1protected RFuture<Boolean> renewExpirationAsync(long threadId) {
 2    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
 3            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
 4                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
 5                "return 1; " +
 6            "end; " +
 7            "return 0;",
 8        Collections.<Object>singletonList(getName()), 
 9        internalLockLeaseTime, getLockName(threadId));
10}

这里面调度任务每隔10s钟执行一次,lua脚本中是续约过期时间,使得当前线程持有的锁不会因为过期时间到了而失效

01_redisson watchdog_.png

Redisson 互斥性原理

还是看上面执行加锁的lua脚本,最后会执行到:

代码语言:javascript
复制
1"return redis.call('pttl', KEYS[1]);",

返回锁还有多久时间过期,我们继续接着看代码:

代码语言:javascript
复制
 1@Override
 2public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
 3    long threadId = Thread.currentThread().getId();
 4    Long ttl = tryAcquire(leaseTime, unit, threadId);
 5    // 返回ttl说明加锁成功,不为空则是加锁失败
 6    if (ttl == null) {
 7        return;
 8    }
 9
10    RFuture<RedissonLockEntry> future = subscribe(threadId);
11    commandExecutor.syncSubscription(future);
12
13    try {
14        // 死循环去尝试获取锁
15        while (true) {
16            // 再次尝试加锁
17            ttl = tryAcquire(leaseTime, unit, threadId);
18            // 如果ttl=null说明抢占锁成功
19            if (ttl == null) {
20                break;
21            }
22
23            // ttl 大于0,抢占锁失败,这个里面涉及到Semaphore,后续会讲解
24            if (ttl >= 0) {
25                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
26            } else {
27                getEntry(threadId).getLatch().acquire();
28            }
29        }
30    } finally {
31        unsubscribe(future, threadId);
32    }
33}

Redisson锁释放原理

直接看lua代码:

代码语言:javascript
复制
 1protected RFuture<Boolean> unlockInnerAsync(long threadId) {
 2    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
 3        // 判断锁key值是否存在
 4        "if (redis.call('exists', KEYS[1]) == 0) then " +
 5            "redis.call('publish', KEYS[2], ARGV[1]); " +
 6            "return 1; " +
 7        "end;" +
 8        // 判断当前机器、当前线程id对应的key是否存在
 9        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
10            "return nil;" +
11        "end; " +
12        // 计数器数量-1 可重入锁
13        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
14        // 如果计数器大于0,说明还在持有锁
15        "if (counter > 0) then " +
16            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
17            "return 0; " +
18        "else " +
19            // 使用del指令删除key
20            "redis.call('del', KEYS[1]); " +
21            "redis.call('publish', KEYS[2], ARGV[1]); " +
22            "return 1; "+
23        "end; " +
24        "return nil;",
25        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
26}

总结

一图总结:

01_redission 可重入锁实现原理.jpg

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-03-19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 可靠性
  • Redisson加锁原理
  • Redisson 可重入原理
  • Redisson watchDog原理
  • Redisson 互斥性原理
  • Redisson锁释放原理
  • 总结
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档