概要
环境准备Redis 如何实现分布式锁线程不安全单机锁分布式锁代码实现Redisson 集成和源码分析Redisson 集成源码分析 `RedissonLock`加锁解锁集群分布式锁失效判断机制总结REFERENCES更多
手机用户请
横屏
获取最佳阅读体验,REFERENCES
中是本文参考的链接,如需要链接和更多资源,可以关注其他博客发布地址。
平台 | 地址 |
---|---|
CSDN | https://blog.csdn.net/sinat_28690417 |
简书 | https://www.jianshu.com/u/3032cc862300 |
个人博客 | https://yiyuer.github.io/NoteBooks/ |
正文
Docker Redis
docker pull redis
docker run -itd --name redis-test -p 6379:6379 redis-6379
docker exec -it redis-test /bin/bash
.
Jmeter 用来模拟并发
官网
Linux
下载后解压 sh /bin/jemeter
Windows
运行 /bin/jmeter.bat
并发场景模拟: 商店库存 stock ,每次 http 接口调用,表示下单,并进行库存的 -1 操作
项目启动默认设置库存1000
/**
* <p>
*
* </p>
*
* @author Helios
* @date 2020/3/14 6:41 下午
*/
@Component
@Slf4j
public class AppStartListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (Objects.isNull(event.getApplicationContext().getParent())) {
initStock();
}
}
private void initStock() {
log.info("init goods stock started");
stringRedisTemplate.opsForValue().set("stock", "1000");
}
}
初始状态查询:库存位1000
.
/**
* 下单:线程不安全
* @return
*/
@RequestMapping("goods/v1/stock")
public String order() {
//查询库存
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
stock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", stock + "");
log.info("order success and stock -1");
} else {
log.error("order failed!");
}
return "request success" + " stock : " + stock;
}
通过 Jmeter 模拟 1s 内发起 200 次请求
.
再次查询库存
.
//...
private Object lock = new Object();
//...
/**
* 下单:synchronized 加锁,单机锁
*
* @return
*/
@RequestMapping("goods/v2/stock")
public String localOrderLock() {
//查询库存
Integer stock = 0;
synchronized (lock) {
stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
stock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", stock + "");
log.info("order success and stock -1");
} else {
log.error("order failed!");
}
}
return "request success" + " stock : " + stock;
}
查询库存
.
分析
上面的结果,可以看出,库存扣减正常,但是这仅仅针对的是单个 JVM 中运行的实例,无法在分布式场景中实现共享资源库存 stock
的访问控制。
Redis Setnx 命令
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
.
.
简单实现
/**
* 下单:setnx 加锁,分布式锁
*
* @return
*/
@RequestMapping("goods/v3/stock")
public String setnx() {
//redis setnx
//Boolean stockLock = stringRedisTemplate.opsForValue().setIfAbsent(productKey, "xxx");
//避免服务器异常,导致锁未被删除,设置超时时间
//stringRedisTemplate.expire(productKey, 30, TimeUnit.SECONDS);
//上面的非原子性,无法确保锁的超时时间一定会被设置
Boolean stockLock = stringRedisTemplate.opsForValue().setIfAbsent(productKey, "xxx",30, TimeUnit.SECONDS);
if (!stockLock) {
return "error";
}
Integer stock = 0;
//处理运行异常,避免死锁,确保系统正常运行场景下每次锁被删除
try {
//查询库存 redis.get
stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
// 更新库存
stock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", stock + "");
log.info("order success and stock -1");
} else {
log.error("order failed!");
}
} finally {
//库存减完后删除锁,确保其他线程不被一致阻塞
stringRedisTemplate.delete(productKey);
}
return "request success" + " stock : " + stock;
}
加锁逻辑和业务解耦
public interface IRedisLock {
boolean tryLock(String key, long timeout, TimeUnit timeUnit);
void releaseLock(String key);
}
@Component
public class RedisLockImpl implements IRedisLock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private ThreadLocal<String> lockLocal = new ThreadLocal<>();
@Override
public boolean tryLock(String key, long timeout, TimeUnit timeUnit) {
String uuid = UUID.randomUUID().toString();
lockLocal.set(uuid);
boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid,timeout, timeUnit);
return lock;
}
@Override
public void releaseLock(String key) {
if (lockLocal.get().equals(stringRedisTemplate.opsForValue().get(key))) {
lockLocal.remove();
stringRedisTemplate.delete(key);
}
}
}
/**
* 下单:setnx 加锁,锁控制和业务逻辑解耦
*
* @return
*/
@RequestMapping("goods/v4/stock")
public String setnx2() {
//业务解耦
if (!redisLock.tryLock(productKey,30, TimeUnit.SECONDS)) {
//此处返回 error,实际业务中一般不希望直接返回错误给用户,而是让用户等待一段时间,因此需要实现阻塞锁,见 RedisLockImpl
return "error";
}
Integer stock = 0;
//处理运行异常,避免死锁,确保系统正常运行场景下每次锁被删除
try {
//TODO STH
} finally {
//库存减完后删除锁,确保其他线程不被一致阻塞
redisLock. releaseLock(productKey);
}
return "request success" + " stock : " + stock;
}
锁特性
/*
* @ProjectName: 编程学习
* @Copyright: 2019 HangZhou Ashe Dev, Ltd. All Right Reserved.
* @address: https://yiyuery.github.io/NoteBooks/
* @date: 2020/3/14 8:48 下午
* @description: 本内容仅限于编程技术学习使用,转发请注明出处.
*/
package com.example.redis.lock.impl;
import com.example.redis.lock.IRedisLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* <p>
*
* </p>
*
* @author Helios
* @date 2020/3/14 8:48 下午
*/
@Component
public class RedisLockImpl implements IRedisLock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private ThreadLocal<String> lockValLocal = new ThreadLocal<>();
private ThreadLocal<Long> asyncThreadIDLocal = new ThreadLocal<>();
@Override
public boolean tryLock(String key, long timeout, TimeUnit timeUnit) {
// String uuid = UUID.randomUUID().toString();
// lockLocal.set(uuid);
// boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid,timeout, timeUnit);
//可重入实现
boolean lock = false;
if (Objects.isNull(lockValLocal.get())) {
String uuid = UUID.randomUUID().toString();
lockValLocal.set(uuid);
lock = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, timeUnit);
//阻塞锁
if (!lock) {
while (true) {
lock = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, timeUnit);
if (lock) {
break;
}
}
//避免锁超时时间提前结束,确保业务代码运行期间,锁不会因为超时而失效,导致线程不安全 => 业务代码执行过程中,锁异步续期(使用异步是因为要避免阻塞当前线程)
Thread thread = new Thread(() -> {
while (true) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
stringRedisTemplate.expire(key, timeout, timeUnit);
}
});
thread.start();
//为删除锁时需要结束续命线程, 记录线程 ID
asyncThreadIDLocal.set(thread.getId());
}
} else {
lock = true;
}
return lock;
}
@Override
public void releaseLock(String key) {
if (lockValLocal.get().equals(stringRedisTemplate.opsForValue().get(key))) {
lockValLocal.remove();
stringRedisTemplate.delete(key);
deleteAsyncExpireThread();
}
}
/**
* <p>
* 结束续命线程
* </p>
*
* @return
* @author Helios
* @date
* @params []
*/
private void deleteAsyncExpireThread() {
Long threadId = asyncThreadIDLocal.get();
Thread th = null;
if (Objects.nonNull(threadId)) {
asyncThreadIDLocal.remove();
ThreadGroup group = Thread.currentThread().getThreadGroup();
while (group != null) {
boolean stop = false;
Thread[] threads = new Thread[(int) (group.activeCount() * 1.2)];
int count = group.enumerate(threads, true);
for (int i = 0; i < count; i++) {
if (threadId == threads[i].getId()) {
th = threads[i];
stop = true;
break;
}
}
if (stop) {
break;
}
group = group.getParent();
}
if (Objects.nonNull(th)) {
th.interrupt();
}
}
}
}
最新版的Redisson
要求redis
能够支持eval
的命令,否则无法实现,即Redis
要求2.6
版本以上。在lua
脚本中可以调用大部分的Redis
命令,使用脚本的好处如下:
减少网络开销
:在Redis操作需求需要向Redis发送5次请求,而使用脚本功能完成同样的操作只需要发送一个请求即可,减少了网络往返时延。原子操作
:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。换句话说在编写脚本的过程中无需担心会出现竞态条件,也就无需使用事务。事务可以完成的所有功能都可以用脚本来实现。复用
:客户端发送的脚本会永久存储在Redis中,这就意味着其他客户端(可以是其他语言开发的项目)可以复用这一脚本而不需要使用代码完成同样的逻辑。增加依赖
compile 'org.redisson:redisson-spring-boot-starter:3.11.5
Redisson GitHub
简单使用
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
//@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port);
//.setPassword(password);
//添加主从配置
//config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
return Redisson.create(config);
}
}
@Autowired
private RedissonClient redissonClient;
//...
/**
* 下单:setnx 加锁,锁控制和业务逻辑解耦
*
* @return
*/
@RequestMapping("goods/v5/stock")
public String redissonLock() {
//Redisson Lock
RLock lock = redisson.getLock(productKey);
if (!lock.tryLock()) {
return "error";
}
Integer stock = 0;
//处理运行异常,避免死锁,确保系统正常运行场景下每次锁被删除
try {
//TODO STH
} finally {
//库存减完后删除锁,确保其他线程不被一致阻塞
lock.unlock();
}
return "request success" + " stock : " + stock;
}
.
.
使用到的全局变量
EXPIRATION_RENEWAL_MAP
:存储entryName和其过期时间,底层用的netty的PlatformDependent.newConcurrentHashMap()internalLockLeaseTime
:锁默认释放的时间:30 * 1000,即30秒id
:UUID,用作客户端的唯一标识pubSub
:订阅者模式,当释放锁的时候,其他客户端能够知道锁已经被释放的消息,并让队列中的第一个消费者获取锁。使用PUB/SUB消息机制的优点:减少申请锁时的等待时间、安全、 锁带有超时时间、锁的标识唯一,防止死锁 锁设计为可重入,避免死锁。commandExecutor
:命令执行器,异步执行器@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
//-- 阻塞锁
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//记录当前线程 ID
long threadId = Thread.currentThread().getId();
//如果为空,当前线程获取锁成功,否则已经被其他客户端加锁,TTL 表示剩余存活时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
//等待释放,并订阅锁
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
//如果没有拿到锁,循环获取:阻塞锁逻辑
while (true) {
// 重新尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// 成功获取锁
if (ttl == null) {
break;
}
//如果没拿到锁, 等待锁释放
// waiting for message
if (ttl >= 0) {
try {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
getEntry(threadId).getLatch().acquire();
} else {
getEntry(threadId).getLatch().acquireUninterruptibly();
}
}
}
} finally {
//解除当前线程和锁事件的订阅
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
//-- 异步续期
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
// 内部异步框架,默认时30s 失效,此处 30/3 = 10 s 进行一次续期
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
//----
public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
private int counter;
private final Semaphore latch;
private final RPromise<RedissonLockEntry> promise;
private final ConcurrentLinkedQueue<Runnable> listeners = new ConcurrentLinkedQueue<Runnable>();
public RedissonLockEntry(RPromise<RedissonLockEntry> promise) {
super();
this.latch = new Semaphore(0);
this.promise = promise;
}
public void aquire() {
counter++;
}
public int release() {
return --counter;
}
public RPromise<RedissonLockEntry> getPromise() {
return promise;
}
public void addListener(Runnable listener) {
listeners.add(listener);
}
public boolean removeListener(Runnable listener) {
return listeners.remove(listener);
}
public ConcurrentLinkedQueue<Runnable> getListeners() {
return listeners;
}
public Semaphore getLatch() {
return latch;
}
}
//--- PublishSubscribe
public void unsubscribe(E entry, String entryName, String channelName) {
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
semaphore.acquire(new Runnable() {
@Override
public void run() {
if (entry.release() == 0) {
// just an assertion
boolean removed = entries.remove(entryName) == entry;
if (!removed) {
throw new IllegalStateException();
}
service.unsubscribe(new ChannelName(channelName), semaphore);
} else {
semaphore.release();
}
}
});
}
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
if (e != null) {
cancelExpirationRenewal(threadId);
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
cancelExpirationRenewal(threadId);
result.trySuccess(null);
});
return result;
}
//-- 异步解锁 Lua 脚本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果keys[1]不存在,则返回
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//为哈希表 KEYS[1] 中的域 ARGV[3] 的值加上增量 -1 : 此处相当于可重入锁的计数状态 -1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
//如果还是重入状态,即锁仍然被持有,则进行续命,增加延时
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
//反之,说明锁已全部释放,则删除当前 KEYS[1] 对应锁,并发布删除锁的消息通知
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
HINCRBY key field increment
: 为哈希表 key 中的域 field 的值加上增量 increment 。增量也可以为负数,相当于对给定域进行减法操作。
如果 key 不存在,一个新的哈希表被创建并执行 HINCRBY 命令。如果域 field 不存在,那么在执行命令前,域的值被初始化为 0 。对一个储存字符
串值的域 field 执行 HINCRBY 命令将造成一个错误。本操作的值被限制在 64 位(bit)有符号数字表示之内。
我们都知道 Redis 本身是支持集群模式的,那么如果其中一个或多个节点挂掉,对应的分布式锁如何在集群场景下保证功能的高可用呢?
这个缺陷其实很明显,如果只有一个 Redis 实例,一旦这个挂了,所有依赖他的服务都挂了。显然不太适合大型的应用。
为了避免单点故障,我们给Redis做一个Master/Slave的主从架构,一个Master,一台Slave。下面就会碰到这么一个问题。下面是使用场景。
分布式锁失效
Redlock算法
假设我们有N(假设5)个Redis master实例,所有节点相互独立,并且业务系统也是单纯的调用,并没有什么其他的类似消息重发之类的辅助系统。下面来模拟一下算法:
t1
减去t0
,计算获取锁所消耗的时间t2=t1-t0
。只有t2
小于锁的业务有效时间(也就是第二步的10秒),并且,客户端在至少3=(5/2+1)
台上获取到锁我们才认为锁获取成功。锁的释放
释放比较简单,直接删除所有实例上对应的key就好。
本文通过介绍 Redis 环境搭建和分布式锁的实战,介绍锁需要具备的一些特性,并分析了 Redisson 源码,以及集群部署 Redis 场景下,锁失效的判断机制介绍。
.