前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于redis的分布式锁二种应用场景

基于redis的分布式锁二种应用场景

作者头像
菩提树下的杨过
发布2019-06-17 15:17:18
1.1K0
发布2019-06-17 15:17:18
举报

“分布式锁”是用来解决分布式应用中“并发冲突”的一种常用手段,实现方式一般有基于zookeeper及基于redis二种。具体到业务场景中,我们要考虑二种情况:

一、抢不到锁的请求,允许丢弃(即:忽略)

比如:一些不是很重要的场景,比如“监控数据持续上报”,某一篇文章的“已读/未读”标识位更新,对于同一个id,如果并发的请求同时到达,只要有一个请求处理成功,就算成功。

用活动图表示如下:

二、并发请求,不论哪一条都必须要处理的场景(即:不允许丢数据)

比如:一个订单,客户正在前台修改地址,管理员在后台同时修改备注。地址和备注字段的修改,都必须正确更新,这二个请求同时到达的话,如果不借助db的事务,很容易造成行锁竞争,但用事务的话,db的性能显然比不上redis轻量。

解决思路:A,B二个请求,谁先抢到分布式锁(假设A先抢到锁),谁先处理,抢不到的那个(即:B),在一旁不停等待重试,重试期间一旦发现获取锁成功,即表示A已经处理完,把锁释放了。这时B就可以继续处理了。

但有二点要注意:

a、需要设置等待重试的最长时间,否则如果A处理过程中有bug,一直卡死,或者未能正确释放锁,B就一直会等待重试,但是又永远拿不到锁。

b、等待最长时间,必须小于锁的过期时间。否则,假设锁2秒过期自动释放,但是A还没处理完(即:A的处理时间大于2秒),这时锁会因为redis key过期“提前”误释放,B重试时拿到锁,造成A,B同时处理。(注:可能有同学会说,不设置锁的过期时间,不就完了么?理论上讲,确实可以这么做,但是如果业务代码有bug,导致处理完后没有unlock,或者根本忘记了unlock,分布式锁就会一直无法释放。所以综合考虑,给分布式锁加一个“保底”的过期时间,让其始终有机会自动释放,更为靠谱)

用活动图表示如下:

写了一个简单的工具类:

代码语言:javascript
复制
package com.cnblogs.yjmyzz.redisdistributionlock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.StringUtils;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * 利用redis获取分布式锁
 *
 * @author 菩提树下的杨过
 * @blog http://yjmyzz.cnblogs.com/
 */
public class RedisLock {

    private StringRedisTemplate redisTemplate;

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * simple lock尝试获取锅的次数
     */
    private int retryCount = 3;

    /**
     * 每次尝试获取锁的重试间隔毫秒数
     */
    private int waitIntervalInMS = 100;


    public RedisLock(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 利用redis获取分布式锁(未获取锁的请求,允许丢弃!)
     *
     * @param redisKey       锁的key值
     * @param expireInSecond 锁的自动释放时间(秒)
     * @return
     * @throws DistributionLockException
     */
    public String simpleLock(final String redisKey, final int expireInSecond) throws DistributionLockException {
        String lockValue = UUID.randomUUID().toString();
        boolean flag = false;
        if (StringUtils.isEmpty(redisKey)) {
            throw new DistributionLockException("key is empty!");
        }
        if (expireInSecond <= 0) {
            throw new DistributionLockException("expireInSecond must be bigger than 0");
        }
        try {
            for (int i = 0; i < retryCount; i++) {
                boolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, lockValue, expireInSecond, TimeUnit.SECONDS);
                if (success) {
                    flag = true;
                    break;
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(waitIntervalInMS);
                } catch (Exception ignore) {
                    logger.warn("redis lock fail: " + ignore.getMessage());

                }
            }
            if (!flag) {
                throw new DistributionLockException(Thread.currentThread().getName() + " cannot acquire lock now ...");
            }
            return lockValue;
        } catch (DistributionLockException be) {
            throw be;
        } catch (Exception e) {
            logger.warn("get redis lock error, exception: " + e.getMessage());
            throw e;
        }
    }

    /**
     * 利用redis获取分布式锁(未获取锁的请求,将在timeoutSecond时间范围内,一直等待重试)
     *
     * @param redisKey       锁的key值
     * @param expireInSecond 锁的自动释放时间(秒)
     * @param timeoutSecond  未获取到锁的请求,尝试重试的最久等待时间(秒)
     * @return
     * @throws DistributionLockException
     */
    public String lock(final String redisKey, final int expireInSecond, final int timeoutSecond) throws DistributionLockException {
        String lockValue = UUID.randomUUID().toString();
        boolean flag = false;
        if (StringUtils.isEmpty(redisKey)) {
            throw new DistributionLockException("key is empty!");
        }
        if (expireInSecond <= 0) {
            throw new DistributionLockException("expireInSecond must be greater than 0");
        }
        if (timeoutSecond <= 0) {
            throw new DistributionLockException("timeoutSecond must be greater than 0");
        }
        if (timeoutSecond >= expireInSecond) {
            throw new DistributionLockException("timeoutSecond must be less than expireInSecond");
        }
        try {
            long timeoutAt = System.currentTimeMillis() + timeoutSecond * 1000;
            while (true) {
                boolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, lockValue, expireInSecond, TimeUnit.SECONDS);
                if (success) {
                    flag = true;
                    break;
                }
                if (System.currentTimeMillis() >= timeoutAt) {
                    break;
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(waitIntervalInMS);
                } catch (Exception ignore) {
                    logger.warn("redis lock fail: " + ignore.getMessage());
                }
            }
            if (!flag) {
                throw new DistributionLockException(Thread.currentThread().getName() + " cannot acquire lock now ...");
            }
            return lockValue;
        } catch (DistributionLockException be) {
            throw be;
        } catch (Exception e) {
            logger.warn("get redis lock error, exception: " + e.getMessage());
            throw e;
        }
    }


    /**
     * 锁释放
     *
     * @param redisKey
     * @param lockValue
     */
    public void unlock(final String redisKey, final String lockValue) {
        if (StringUtils.isEmpty(redisKey)) {
            return;
        }
        if (StringUtils.isEmpty(lockValue)) {
            return;
        }
        try {
            String currLockVal = redisTemplate.opsForValue().get(redisKey);
            if (currLockVal != null && currLockVal.equals(lockValue)) {
                boolean result = redisTemplate.delete(redisKey);
                if (!result) {
                    logger.warn(Thread.currentThread().getName() + " unlock redis lock fail");
                } else {
                    logger.info(Thread.currentThread().getName() + " unlock redis lock:" + redisKey + " successfully!");
                }
            }
        } catch (Exception je) {
            logger.warn(Thread.currentThread().getName() + " unlock redis lock error:" + je.getMessage());
        }
    }
}

然后写个spring-boot来测试一下:

代码语言:javascript
复制
package com.cnblogs.yjmyzz.redisdistributionlock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
public class RedisDistributionLockApplication {

    private static Logger logger = LoggerFactory.getLogger(RedisDistributionLockApplication.class);

    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext applicationContext = SpringApplication.run(RedisDistributionLockApplication.class, args);

        //初始化
        StringRedisTemplate redisTemplate = applicationContext.getBean(StringRedisTemplate.class);
        RedisLock redisLock = new RedisLock(redisTemplate);
        String lockKey = "lock:test";


        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch threadsLatch = new CountDownLatch(2);

        final int lockExpireSecond = 5;
        final int timeoutSecond = 3;

        Runnable lockRunnable = () -> {
            String lockValue = "";
            try {
                //等待发令枪响,防止线程抢跑
                start.await();

                //允许丢数据的简单锁示例
                lockValue = redisLock.simpleLock(lockKey, lockExpireSecond);


                //不允许丢数据的分布式锁示例
                //lockValue = redisLock.lock(lockKey, lockExpireSecond, timeoutSecond);

                //停一会儿,故意让后面的线程抢不到锁
                TimeUnit.SECONDS.sleep(2);
                logger.info(String.format("%s get lock successfully, value:%s", Thread.currentThread().getName(), lockValue));

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                redisLock.unlock(lockKey, lockValue);
                //执行完后,计数减1
                threadsLatch.countDown();
            }

        };

        Thread t1 = new Thread(lockRunnable, "T1");
        Thread t2 = new Thread(lockRunnable, "T2");

        t1.start();
        t2.start();

        //预备:开始!
        start.countDown();

        //等待所有线程跑完
        threadsLatch.await();

        logger.info("======>done!!!");

    }

}

 用2个线程模拟并发场景,跑起来后,输出如下:

可以看到T2线程没抢到锁,直接抛出了预期的异常。

把44行的注释打开,即:换成不允许丢数据的模式,再跑一下:

可以看到,T1先抢到锁,然后经过2秒的处理后,锁释放,这时T2重试拿到了锁,继续处理,最终释放。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-06-15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档