专栏首页HUC思梦的java专栏redis命令和lua实现分布式锁

redis命令和lua实现分布式锁

Redis分布式锁关键

SETNX

语法: SETNX key value

  • 如果key不存在,则存储(key:value)值,返回1
  • 如果key已经不存在,则不执行操作,返回0

因为这个命令的性质,多个线程竞争时只有一个线程能修改key的值。利用这一点可以实现锁的互斥功能。

Redis分布式锁实现

定义接口

public interface Lock {
    /**
     * 获取锁
     * @param lock 锁名称
     */
    void lock(String lock);

    /**
     * 释放锁
     * @param lock 锁名称
     */
    void unlock(String lock);
}
分布式锁代码实现:
public class DistributeLock implements Lock {
    private static final Logger logger  = LoggerFactory.getLogger(DistributeLock.class);

    private static final int LOCK_MAX_EXIST_TIME = 5;  // 单位s,一个线程持有锁的最大时间
    private static final String LOCK_PREX = "lock_"; // 作为锁的key的前缀

    private StringRedisTemplate redisTemplate;
    private String lockPrex; // 做为锁key的前缀
    private int lockMaxExistTime; // 单位s,一个线程持有锁的最大时间
    private ThreadLocal<String> threadId = new ThreadLocal<String>();  // 线程变量

    public DistributeLock(StringRedisTemplate redisTemplate){
        this(redisTemplate, LOCK_PREX, LOCK_MAX_EXIST_TIME);
    }

    public DistributeLock(StringRedisTemplate redisTemplate, String lockPrex, int lockMaxExistTime){
        this.redisTemplate = redisTemplate;
        this.lockPrex = lockPrex;
        this.lockMaxExistTime = lockMaxExistTime;
    }

    @Override
    public void lock(String lock){
        Assert.notNull(lock, "lock can't be null!");
        String lockKey = generatorLockKey(lock);
        BoundValueOperations<String,String> keyBoundValueOperations = redisTemplate.boundValueOps(lockKey);     
        while(true){
            // 如果上次拿到锁的是自己,则本次也可以拿到锁:实现可重入
            String value = keyBoundValueOperations.get();
            // 根据传入的值,判断用户是否持有这个锁
            if(value != null && value.equals(String.valueOf(threadId.get()))){
                // 重置过期时间
                keyBoundValueOperations.expire(lockMaxExistTime, TimeUnit.SECONDS);
                break;
            }

            if(keyBoundValueOperations.setIfAbsent(lockKey)){
                // 每次获取锁时,必须重新生成id值
                String keyUniqueId = UUID.randomUUID().toString(); // 生成key的唯一值
                threadId.set(keyUniqueId);
                // 显设置value,再设置过期日期,否则过期日期无效
                keyBoundValueOperations.set(String.valueOf(keyUniqueId));
                // 为了避免一个用户拿到锁后,进行过程中没有正常释放锁,这里设置一个默认过期时间,这段非常重要,如果没有,则会造成死锁
                keyBoundValueOperations.expire(lockMaxExistTime, TimeUnit.SECONDS);
                // 拿到锁后,跳出循环
                break;
            }else{
                try {
                    // 短暂休眠,nano避免出现活锁 
                    Thread.sleep(10, (int)(Math.random() * 500));
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }


    /**
     * 释放锁,同时要考虑当前锁是否为自己所有,以下情况会导致当前线程失去锁:线程执行的时间超过超时的时间,导致此锁被其它线程拿走; 此时用户不可以执行删除
     * 
     * 以上方法的缺陷:
     *  a. 在本线程获取值,判断锁本线程所有,但是在执行删除前,锁超时被释放同时被另一个线程获取,则本操作释放锁
     * 
     * 最终解决方案
     *  a. 使用lua脚本,保证检测和删除在同一事物中
     * 
     */
    @Override
    public void unlock(final String lock) {
        final String lockKey = generatorLockKey(lock);
        BoundValueOperations<String,String> keyBoundValueOperations = redisTemplate.boundValueOps(lockKey);
        String lockValue = keyBoundValueOperations.get();
        if(!StringUtils.isEmpty(lockValue) && lockValue.equals(threadId.get())){
            redisTemplate.delete(lockKey);
        }else{
            logger.warn("key=[{}]已经变释放了,本次不执行释放. 线程Id[{}] ", lock, lockValue);  
        }
    }

    /**
     * 生成key
     * @param lock
     * @return
     */
    private String generatorLockKey(String lock){
        StringBuilder sb = new StringBuilder();
        sb.append(lockPrex).append(lock);
        return sb.toString();
    }

}

1、ThreadLocal threadId:通过threadId保存每个线程锁的UUID值,用于区分当前锁是否为自己所有,并且锁的value也存储此值 2、lock主要逻辑:通过BoundValueOperations的setIfAbsent设置lockKey值(setIfAbsent其实就是封装了SETNX的命令),如果返回true,则表示已经获取锁;如果返回false,则进入等待 unlock主要逻辑:通过redisTemplate.delete释放锁。在释放锁前,需要判断当前锁被当前线程所有,如果是,才执行释放锁,否则不执行 3、避免死锁:如果线程A拿到锁后,在执行释放锁前,突然死掉了,则其它线程都无法再次获取锁,从而出现死锁。为了避免死锁,我们获取锁后,需要为锁设置一个有效期,即使锁的拥有者死掉了,此锁也可以被自动释放 4、锁可重入:线程A拿到锁后,如果他再次执行lock,也可以再次拿到锁,而不是出现在等待锁的队列中; 如果当前线程已经获取锁,则再次请求锁则一定可以获取锁,否则会出现自己等待自己释放锁,从而出现死锁

封装分布式锁代码逻辑暴露调用API
public interface LockManager {
    /**
     * 通过加锁安全执行程序,无返回的数据
     * @param lockKeyName key名称
     * @param callback  
     */
    void lockCallBack(String lockKeyName, SimpleCallBack callback);
    /**
     * 通过加锁安全执行程序,有返回数据
     * @param lockKeyName
     * @param callback
     * @return
     */
    <T> T lockCallBackWithRtn(String lockKeyName, ReturnCallBack<T> callback);
}
@Component
public class SimpleRedisLockManager implements LockManager {   

    @Autowired
    protected StringRedisTemplate redisTemplate;

    protected Lock distributeLock; // 分布锁

    @PostConstruct
    public void init(){
        // 初始化锁
        distributeLock = new DistributeLock(redisTemplate, "mylock_", 5);
    }

    @Override
    public void lockCallBack(String lockKeyName, SimpleCallBack callback){
        Assert.notNull("lockKeyName","lockKeyName 不能为空");
        Assert.notNull("callback","callback 不能为空");
        try{
            // 获取锁
            distributeLock.lock(lockKeyName);
            callback.execute();
        }finally{
            // 必须释放锁
            distributeLock.unlock(lockKeyName);
        }
    }

    @Override
    public <T> T lockCallBackWithRtn(String lockKeyName, ReturnCallBack<T> callback){
        Assert.notNull("lockKeyName","lockKeyName 不能为空");
        Assert.notNull("callback","callback 不能为空");
        try{
            // 获取锁
            distributeLock.lock(lockKeyName);
            return callback.execute();
        }finally{
            // 必须释放锁
            distributeLock.unlock(lockKeyName);
        }
    }
}
/**
 * 无返回值的回调函数
 * @author hry
 *
 */
public interface SimpleCallBack {
    void execute();
}
/**
 * 有返回数据的回调函数
 * 
 * @author hry
 *
 * @param <T>
 */
public interface ReturnCallBack<T> {
    T execute();
}
测试分布式锁
@Autowired
private SimpleRedisLockManager simpleRedisLockManager;

 simpleRedisLockManager.lockCallBack("distributeLock" + ThreadLocalRandom.current().nextInt(1000), new SimpleCallBack() {
    @Override
    public void execute() {
        System.out.println("lockCallBack");
    }
});

1、如果线程A拿到锁超过规定的时间还没有结束,则此时redis会自动释放锁。此时线程B拿到锁,则同时线程A和线程B同时拿到锁。对于这种情况,可以通过设置合理的超时时间解决。 2、如果并发量很大,则可能出现多个线程同时拥有锁。这是因为在DistributeLock的lock和unlock方法都执行多条语句且这些语句不是事务的。比如线程A在unlock时,通过get方法得知自己拥有锁,然后他执行释放锁操作。在这两个操作之间,redis发现锁到期,自动删除锁,此时线程B申请并且得到锁。这时线程A才执行删除锁操作,则另外线程C也可以得到锁,此时线程B,C同时得到锁。这种情况可以通过下文的lua方法解决

改进代码加入lua脚本保证原子性操作
lock.lua => 加锁脚本
-- Set a lock
--  如果获取锁成功,则返回 1
local key     = KEYS[1]
local content = KEYS[2]
local ttl     = ARGV[1]
local lockSet = redis.call('setnx', key, content)
if lockSet == 1 then
  redis.call('pexpire', key, ttl)
else 
  -- 如果value相同,则认为是同一个线程的请求,则认为重入锁
  local value = redis.call('get', key)
  if(value == content) then
    lockSet = 1;
    redis.call('pexpire', key, ttl)
  end
end
return lockSet
----------------------------
unlock.lua => 解锁脚本
-- unlock key
local key     = KEYS[1]
local content = KEYS[2]
local value = redis.call('get', key)
if value == content then
  return redis.call('del', key);
end
return 0
基于lua脚本实现分布式锁
public class LuaDistributeLock implements Lock {
    private static final int LOCK_MAX_EXIST_TIME = 5;  // 单位s,一个线程持有锁的最大时间
    private static final String LOCK_PREX = "lock_"; // 作为锁的key的前缀

    private StringRedisTemplate redisTemplate;
    private String lockPrex; // 做为锁key的前缀
    private int lockMaxExistTime; // 单位s,一个线程持有锁的最大时间
    private DefaultRedisScript<Long> lockScript; // 加锁锁脚本
    private DefaultRedisScript<Long> unlockScript; // 解锁脚本

    // 线程变量
    private ThreadLocal<String> threadKeyId = new ThreadLocal<String>(){
        @Override
        protected String initialValue() {
            return UUID.randomUUID().toString();
        }
    };  

    public LuaDistributeLock(StringRedisTemplate redisTemplate){
        this(redisTemplate, LOCK_PREX, LOCK_MAX_EXIST_TIME);
    }

    public LuaDistributeLock(StringRedisTemplate redisTemplate, String lockPrex, int lockMaxExistTime){
        this.redisTemplate = redisTemplate;
        this.lockPrex = lockPrex;
        this.lockMaxExistTime = lockMaxExistTime;
        // init
        init();
    }

    /**
     * 初始化lua的加锁和解锁脚本对象
     */
    public void init() {
        // Lock script
        lockScript = new DefaultRedisScript<Long>();
        lockScript.setScriptSource(
            new ResourceScriptSource(new ClassPathResource("com/mmren/edu/spring/boot/redis/distributedlock/lock.lua")));
        lockScript.setResultType(Long.class);
        // unlock script
        unlockScript = new DefaultRedisScript<Long>();
        unlockScript.setScriptSource(
            new ResourceScriptSource(new ClassPathResource("com/mmren/edu/spring/boot/redis/distributedlock/unlock.lua")));
        unlockScript.setResultType(Long.class);
    }

    @Override
    public void lock(String lock2){
        Assert.notNull(lock2, "lock2 can't be null!");
        String lockKey = generatorLockKey(lock2);
        while(true){
            List<String> keyList = new ArrayList<String>();
            keyList.add(lockKey);
            keyList.add(threadKeyId.get());
            if(redisTemplate.execute(lockScript, keyList, String.valueOf(lockMaxExistTime * 1000)) > 0){
                break;
            }else{
                try {
                    // 短暂休眠,nano避免出现活锁 
                    Thread.sleep(10, (int)(Math.random() * 500));
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }


    /**
     * 释放锁,同时要考虑当前锁是否为自己所有,以下情况会导致当前线程失去锁:线程执行的时间超过超时的时间,导致此锁被其它线程拿走; 此时用户不可以执行删除
     */
    @Override
    public void unlock(final String lock) {
        final String lockKey = generatorLockKey(lock);
        List<String> keyList = new ArrayList<String>();
        keyList.add(lockKey);
        keyList.add(threadKeyId.get());
        redisTemplate.execute(unlockScript, keyList);
    }

    /**
     * 生成key
     * @param lock
     * @return
     */
    private String generatorLockKey(String lock){
        StringBuilder sb = new StringBuilder();
        sb.append(lockPrex).append(lock);
        return sb.toString();
    }

}
封装lua分布式锁暴露API
@Component
public class LuaLockRedisLockManager extends SimpleRedisLockManager {
    @PostConstruct
    public void init(){
        // 初始化锁
        distributeLock = new LuaDistributeLock(redisTemplate, "mylock_", 5);
    }
}
测试lua分布式锁
@Autowired
private LuaLockRedisLockManager luaLockRedisLockManager;

luaLockRedisLockManager.lockCallBack("distributeLock2" + ThreadLocalRandom.current().nextInt(1000), new SimpleCallBack() {
    @Override
    public void execute() {
        System.out.println("distributeLock2");
    }
});

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 使用ExecutorService实现线程池

      当一个程序中创建了许多线程,并在任务结束后销毁,会给系统带来过度消耗资源,以及过度切换线程的危险,从而可能导致系统崩溃。为此我们应使用线程池来解决这个问题。

    HUC思梦
  • JAVA多线程及补充

    进程 运行中的应用程序叫进程,每个进程运行时,都有自已的地址空间(内存空间) 如IE浏览器在任务管器中可以看到 操作系统都是支持多进程的

    HUC思梦
  • synchronized底层揭秘

    上篇文章我们从硬件级别探索,对可见性和有序性的认识上升了一个高度,却迟迟没有介绍原子性的解决方案。

    HUC思梦
  • 小白需懂的异步请求的处理

    在我们传统的服务中,当一个HTTP请求过来时,tomcat或者是其他的中间件都会有一个主线程来处理请求,所有的业务逻辑都会在这个线程里面处理完,...

    用户7386338
  • 在.NET Core 中的并发编程

    原文地址:http://www.dotnetcurry.com/dotnet/1360/concurrent-programming-dotnet-core 今...

    企鹅号小编
  • Java深入

    java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:

    大学里的混子
  • 谈谈线程

    关于Java 的线程问题,我们上大学的时候,计算机专业的学生肯定会遇到这两个名词–线程和进程,老师和我们说一个进程里面可以有多个线程,这里也引出了多线程的概念。

    MiChong
  • JavaScript是如何处理事件?

    #思特沃克好声音# (图片:网络) 想必大家都知道JavaScript一般都是在浏览器中执行,大家也知道可以通过事件调用JavaScript函数,可是大家清楚J...

    ThoughtWorks
  • java 面试杂记

    1.git 是分布式的,svn不是,每个开发人员从中心版本库/服务器上chect out代码后会在自己的机器上克隆一个自己的版本库。 2.git 把内容安装元...

    黑白格
  • 深入JDK源码之ThreadLocal类

    ThreadLocal概述 学习JDK中的类,首先看下JDK API对此类的描述,描述如下: 该类提供了线程局部 (thread-local) 变量。这些变量不...

    用户1263954

扫码关注云+社区

领取腾讯云代金券