前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redission分布式锁加锁原理源码解析

Redission分布式锁加锁原理源码解析

作者头像
IT云清
发布2020-01-13 16:29:06
1.4K0
发布2020-01-13 16:29:06
举报
文章被收录于专栏:IT云清IT云清
追踪一下redission加锁的实现源码,并详细介绍核心加锁代码lua脚本的执行原理和过程。

1.获取锁

这里是我们自己实现,调用redission的方法,获取锁,然后加锁。lock.lock(expireTime, timeUnit)是关键,我们追进去。

代码语言:javascript
复制
    /**
     * 获取锁,如果没有主动调用unlock解锁,expireTime后会自动释放
     * @param lockKey
     * @param expireTime 如果没有调用unlock解锁,expireTime 后自动释放
     * @param timeUnit 时间单位
     * @return
     */
    public RLock lock(String lockKey,Integer expireTime,TimeUnit timeUnit){
        RLock lock = redisson.getLock(lockKey);
        lock.lock(expireTime, timeUnit);
        logger.info("【Redisson lock】success to acquire lock for [ "+lockKey+" ],expire time:"+expireTime+timeUnit);
        return lock;
    }

进入 lock.lock(expireTime, timeUnit);

代码语言:javascript
复制
    /**
     * Acquires the lock.
     *
     * <p>If the lock is not available then the current thread becomes
     * disabled for thread scheduling purposes and lies dormant until the
     * lock has been acquired.
     *
     * If the lock is acquired, it is held until <code>unlock</code> is invoked,
     * or until leaseTime milliseconds have passed
     * since the lock was granted - whichever comes first.
     *
     * @param leaseTime the maximum time to hold the lock after granting it,
     *        before automatically releasing it if it hasn't already been released by invoking <code>unlock</code>.
     *        If leaseTime is -1, hold the lock until explicitly unlocked.
     * @param unit the time unit of the {@code leaseTime} argument
     *
     */
    void lock(long leaseTime, TimeUnit unit);

进入实现方法

代码语言:javascript
复制
    @Override
    public void lock(long leaseTime, TimeUnit unit) {
        try {
            lockInterruptibly(leaseTime, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

进入 lockInterruptibly(leaseTime, unit);

代码语言:javascript
复制
    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

进入Long ttl = tryAcquire(leaseTime, unit, threadId);

代码语言:javascript
复制
    private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }

进入

代码语言:javascript
复制
    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

进入核心tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG)

这里是加锁的核心方法:

代码语言:javascript
复制
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    	//时间转化为毫秒值
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

这里最终的是执行了一段具有原子性的lua脚本,由CommandAsynExecutor执行; 这个锁最终持久化到redis时,使用的是hash类型的key field value; 这里注意下最后一行几个参数的对应关系: getName()), internalLockLeaseTime, getLockName(threadId) 分别是key[1],ARGV[1],ARGV[2];

  • getName()是逻辑锁名称,例如:我们发起锁的一方传递的锁名称 productId1672822;
  • internalLockLeaseTime是毫秒单位的锁过期时间;
  • getLockName则是锁对应的线程级别的名称,因为支持相同线程可重入,不同线程不可重入,所以这里的锁的生成方式是:UUID+":"threadId

Lua脚本中的执行分为以下三步:

  • 1:exists检查redis中是否存在锁名称;如果不存在,则获取成功;同时把逻辑锁名称KEYS[1],线程级别的锁名称[ARGV[2],value=1,设置到redis。并设置逻辑锁名称的过期时间ARGV[2],返回;
  • 2:如果检查到存在KEYS[1],[ARGV[2],则说明获取成功,此时会自增对应的value值,记录重入次数;并更新锁的过期时间
  • 3:key不存,直接返回key的剩余过期时间(-2)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-08-29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 追踪一下redission加锁的实现源码,并详细介绍核心加锁代码lua脚本的执行原理和过程。
  • 1.获取锁
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档