实践基于Redis的分布式锁应用场景实现方式基于Redis的实践参考资料

本文来自社区这周的讨论话题—— 技术专题讨论第四期:漫谈分布式锁,也总结了我对分布式锁的认知和使用经验。

应用场景

当多个机器(多个进程)会对同一条数据进行修改时,并且要求这个修改是原子性的。这里有两个限定:(1)多个进程之间的竞争,意味着JDK自带的锁失效;(2)原子性修改,意味着数据是有状态的,修改前后有依赖。

实现方式

  • 基于Redis实现,主要基于redis的setnx(set if not exist)命令;
  • 基于Zookeeper实现;
  • 基于version字段实现,乐观锁,两个线程可以同时读取到原有的version值,但是最终只有一个可以完成操作;

这三种方式中,我接触过第一和第三种。基于redis的分布式锁功能更加强大,可以实现阻塞和非阻塞锁。

基于Redis的实践

锁的实现

  • 锁的key为目标数据的唯一键,value为锁的期望超时时间点;
  • 首先进行一次setnx命令,尝试获取锁,如果获取成功,则设置锁的最终超时时间(以防在当前进程获取锁后奔溃导致锁无法释放);如果获取锁失败,则检查当前的锁是否超时,如果发现没有超时,则获取锁失败;如果发现锁已经超时(即锁的超时时间小于等于当前时间),则再次尝试获取锁,取到后判断下当前的超时时间和之前的超时时间是否相等,如果相等则说明当前的客户端是排队等待的线程里的第一个尝试获取锁的,让它获取成功即可。

基于redis实现分布式锁逻辑.png

public class RedisDistributionLock {

    private static final Logger logger = LoggerFactory.getLogger(RedisDistributionLock.class);

    //key的TTL,一天
    private static final int finalDefaultTTLwithKey = 24 * 3600;

    //锁默认超时时间,20秒
    private static final long defaultExpireTime = 20 * 1000;

    private static final boolean Success = true;
    
    @Resource( name = "redisTemplate")
    private RedisTemplate<String, String> redisTemplateForGeneralize;

    /**
     * 加锁,锁默认超时时间20秒
     * @param resource
     * @return
     */
    public boolean lock(String resource) {
        return this.lock(resource, defaultExpireTime);
    }

    /**
     * 加锁,同时设置锁超时时间
     * @param key 分布式锁的key
     * @param expireTime 单位是ms
     * @return
     */
    public boolean lock(String key, long expireTime) {

        logger.debug("redis lock debug, start. key:[{}], expireTime:[{}]",key,expireTime);
        long now = Instant.now().toEpochMilli();
        long lockExpireTime = now + expireTime;

        //setnx
        boolean executeResult = redisTemplateForGeneralize.opsForValue().setIfAbsent(key,String.valueOf(lockExpireTime));
        logger.debug("redis lock debug, setnx. key:[{}], expireTime:[{}], executeResult:[{}]", key, expireTime,executeResult);

        //取锁成功,为key设置expire
        if (executeResult == Success) {
            redisTemplateForGeneralize.expire(key,finalDefaultTTLwithKey, TimeUnit.SECONDS);
            return true;
        }
        //没有取到锁,继续流程
        else{
            Object valueFromRedis = this.getKeyWithRetry(key, 3);
            // 避免获取锁失败,同时对方释放锁后,造成NPE
            if (valueFromRedis != null) {
                //已存在的锁超时时间
                long oldExpireTime = Long.parseLong((String)valueFromRedis);
                logger.debug("redis lock debug, key already seted. key:[{}], oldExpireTime:[{}]",key,oldExpireTime);
                //锁过期时间小于当前时间,锁已经超时,重新取锁
                if (oldExpireTime <= now) {
                    logger.debug("redis lock debug, lock time expired. key:[{}], oldExpireTime:[{}], now:[{}]", key, oldExpireTime, now);
                    String valueFromRedis2 = redisTemplateForGeneralize.opsForValue().getAndSet(key, String.valueOf(lockExpireTime));
                    long currentExpireTime = Long.parseLong(valueFromRedis2);
                    //判断currentExpireTime与oldExpireTime是否相等
                    if(currentExpireTime == oldExpireTime){
                        //相等,则取锁成功
                        logger.debug("redis lock debug, getSet. key:[{}], currentExpireTime:[{}], oldExpireTime:[{}], lockExpireTime:[{}]", key, currentExpireTime, oldExpireTime, lockExpireTime);
                        redisTemplateForGeneralize.expire(key, finalDefaultTTLwithKey, TimeUnit.SECONDS);
                        return true;
                    }else{
                        //不相等,取锁失败
                        return false;
                    }
                }
            }
            else {
                logger.warn("redis lock,lock have been release. key:[{}]", key);
                return false;
            }
        }
        return false;
    }

    private Object getKeyWithRetry(String key, int retryTimes) {
        int failTime = 0;
        while (failTime < retryTimes) {
            try {
                return redisTemplateForGeneralize.opsForValue().get(key);
            } catch (Exception e) {
                failTime++;
                if (failTime >= retryTimes) {
                    throw e;
                }
            }
        }
        return null;
    }

    /**
     * 解锁
     * @param key
     * @return
     */
    public boolean unlock(String key) {
        logger.debug("redis unlock debug, start. resource:[{}].",key);
        redisTemplateForGeneralize.delete(key);
        return Success;
    }
}

自定义注解使用分布式锁

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedisLockAnnoation {

    String keyPrefix() default "";

    /**
     * 要锁定的key中包含的属性
     */
    String[] keys() default {};

    /**
     * 是否阻塞锁;
     * 1. true:获取不到锁,阻塞一定时间;
     * 2. false:获取不到锁,立即返回
     */
    boolean isSpin() default true;

    /**
     * 超时时间
     */
    int expireTime() default 10000;

    /**
     * 等待时间
     */
    int waitTime() default 50;

    /**
     * 获取不到锁的等待时间
     */
    int retryTimes() default 20;
}

实现分布式锁的逻辑

@Component
@Aspect
public class RedisLockAdvice {

    private static final Logger logger = LoggerFactory.getLogger(RedisLockAdvice.class);

    @Resource
    private RedisDistributionLock redisDistributionLock;

    @Around("@annotation(RedisLockAnnoation)")
    public Object processAround(ProceedingJoinPoint pjp) throws Throwable {
        //获取方法上的注解对象
        String methodName = pjp.getSignature().getName();
        Class<?> classTarget = pjp.getTarget().getClass();
        Class<?>[] par = ((MethodSignature) pjp.getSignature()).getParameterTypes();
        Method objMethod = classTarget.getMethod(methodName, par);
        RedisLockAnnoation redisLockAnnoation = objMethod.getDeclaredAnnotation(RedisLockAnnoation.class);

        //拼装分布式锁的key
        String[] keys = redisLockAnnoation.keys();
        Object[] args = pjp.getArgs();
        Object arg = args[0];
        StringBuilder temp = new StringBuilder();
        temp.append(redisLockAnnoation.keyPrefix());
        for (String key : keys) {
            String getMethod = "get" + StringUtils.capitalize(key);
            temp.append(MethodUtils.invokeExactMethod(arg, getMethod)).append("_");
        }
        String redisKey = StringUtils.removeEnd(temp.toString(), "_");

        //执行分布式锁的逻辑
        if (redisLockAnnoation.isSpin()) {
            //阻塞锁
            int lockRetryTime = 0;
            try {
                while (!redisDistributionLock.lock(redisKey, redisLockAnnoation.expireTime())) {
                    if (lockRetryTime++ > redisLockAnnoation.retryTimes()) {
                        logger.error("lock exception. key:{}, lockRetryTime:{}", redisKey, lockRetryTime);
                        throw ExceptionUtil.geneException(CommonExceptionEnum.SYSTEM_ERROR);
                    }
                    ThreadUtil.holdXms(redisLockAnnoation.waitTime());
                }
                return pjp.proceed();
            } finally {
                redisDistributionLock.unlock(redisKey);
            }
        } else {
            //非阻塞锁
            try {
                if (!redisDistributionLock.lock(redisKey)) {
                    logger.error("lock exception. key:{}", redisKey);
                    throw ExceptionUtil.geneException(CommonExceptionEnum.SYSTEM_ERROR);
                }
                return pjp.proceed();
            } finally {
                redisDistributionLock.unlock(redisKey);
            }
        }
    }
}

参考资料

  1. Java分布式锁三种实现方案
  2. Java注解的基础与高级应用
  3. 基于 AOP 和 Redis 实现的分布式锁
  4. 如何高效排查系统故障?一分钱引发的系统设计“踩坑”案例
  5. 用redis实现分布式锁

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏用户2442861的专栏

Python标准模块logging

http://blog.csdn.net/fxjtoday/article/details/6307285

931
来自专栏Android群英传

NDK Maping 发布啦

1132
来自专栏大数据平台TBDS

Flume-Hbase-Sink针对不同版本flume与HBase的适配研究与经验总结

导语:本文细致而全面地讲解使用flume输出数据到HBase的三种不同 Flume-Hbase-Sink 之间的差异性,以及技术细节。并且透彻而全面地总结了不同...

2.1K12
来自专栏Jerry的SAP技术分享

Hybris Enterprise Commerce Platform 服务层的设计与实现

Hybris Enterprise Commerce Platform这个系列之前已经由我的同事,SAP成都研究院Hybris开发团队的同事张健(Zhang J...

1132
来自专栏菩提树下的杨过

rpc框架之avro 学习 1 - hello world

avro是hadoop的一个子项目,提供的功能与thrift、Protocol Buffer类似,都支持二进制高效序列化,也自带RPC机制,但是avro使用起来...

25910
来自专栏Ryan Miao

Spring cache简单使用guava cache

Spring cache简单使用 前言 spring有一套和各种缓存的集成方式。类似于sl4j,你可以选择log框架实现,也一样可以实现缓存实现,比如ehcac...

7197
来自专栏黑泽君的专栏

day50_BOS项目_02

我们再补上IUserDao和UserDaoImpl的示例代码: IUserDao.java

852
来自专栏木木玲

Netty in Action ——— Netty的组件和设计

1704
来自专栏LinkedBear的个人空间

浅析RPC与WebService

虽然现在非常火的RPC技术以SpringCloud和Dubbo(x)为主流,但是如果做接口调用,还是逃不了要用一些较传统的技术。前几天在做接口调用时恰巧用到了W...

3471
来自专栏cmazxiaoma的架构师之路

IDEA入门(1)--lombok和Junit generator2插件的运用

2383

扫码关注云+社区

领取腾讯云代金券