我们不生产代码,我们是代码的搬运工
前不久,阿里大牛虾总再次抛出了分布式锁的讨论,对照之前项目中实现的redis分布式锁总结一下
天才是1%的灵感,加上99%的汗水;编程是1%的编码,加上99%的在Google/StackOverflow/Github上找代码 残酷的现实是,找来的代码可能深藏bug,而不知
在多核多线程环境中,通过锁机制,在某一个时间点上,只能有一个线程进入临界区代码,从而保证临界区中操作数据的一致性
可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。 这把锁要是一把可重入锁(避免死锁) 支持阻塞和非阻塞:和 ReentrantLock 一样支持 lock 和 trylock 以及 tryLock(long timeOut) 这把锁最好是一把公平锁(根据业务需求考虑要不要这条) 有高可用的获取锁和释放锁功能 获取锁和释放锁的性能要好
理论知识知道得再多,还得落地才行;只要遵从三要素,就能打造一把好锁,不要拘泥于某一种工具。
网上有很多实现方式,主要是”外部存储“使用了不同的组件,比如数据库,redis,zk,由于这些组件各自特性的不同,实现复杂度各有不同
这儿主要说下在实际工作中使用到的两种方式,数据库与redis
数据库,任何系统都需要的组件,常规手法,都是使用version来实现乐观锁
比如A、B操作员同时读取一余额为1000元的账户,A操作员为该账户增加100元,B操作员同时为该账户扣除50元,A先提交,B后提交。最后实际账户余额为1000-50=950元,但本该为1000+100-50=1050。这就是典型的并发问题
假设数据库中帐户信息表中有一个version字段,当前值为1;而当前帐户余额字段(balance)为1000元。假设操作员A先更新完,操作员B后更新。 a、操作员A此时将其读出(version=1),并从其帐户余额中增加100(1000+100=1100)。 b、在操作员A操作的过程中,操作员B也读入此用户信息(version=1),并从其帐户余额中扣除50(1000-50=950)。 c、操作员A完成了修改工作,将数据版本号加一(version=2),连同帐户增加后余额(balance=1100),提交至数据库更新,此时由于提交数据版本大于数据库记录当前版本,数据被更新,数据库记录version更新为2。 d、操作员B完成了操作,也将版本号加一(version=2)试图向数据库提交数据(balance=950),但此时比对数据库记录版本时发现,操作员B提交的数据版本号为2,数据库记录当前版本也为2,不满足 “提交版本必须大于记录当前版本才能执行更新 “的乐观锁策略,因此,操作员B的提交被驳回。 这样,就避免了操作员B用基于version=1的旧数据修改的结果覆盖操作员A的操作结果的可能。
set balance=1100,version=version+1 where id=#{id} and version=#{version};
version简单,除了对业务数据表有侵入性,还有一些场景是胜任不了
比如,在操作一个数量之前,需要确认一下能不能操作
int countLimit = select count from limit where id = ${id};
if(countlimit>0){
set balance=1100,version=version+1 where id=#{id} and version=#{version};
}
update count;
这儿操作了多张表,此时就需要再配合事务,才能保证原子性
由于db性能的限制,而redis性能卓越,很多时候会选择redis实现方式
怎么使用redis正确地实现分布式锁,需要了解两方面
setnx 命令(『SET if Not eXists』(如果不存在,则 SET)的简写): 设置成功,返回 1 设置失败,返回 0 该命令是原子操作
getset 命令: 自动将key对应到value并且返回原来key对应的value。如果key存在但是对应的value不是字符串,就返回错误。 返回值:返回之前的旧值,如果之前Key不存在将返回nil。 该命令是原子操作。
get 命令: get获取key的值,如果存在,则返回;如果不存在,则返回nil;
del 命令: del删除key及key对应的值,如果key不存在,程序忽略
SET 命令: set key value [EX seconds] [PX milliseconds] [NX|XX] 将字符串值 value 关联到 key 如果 key 已经持有其他值, SET 就覆写旧值,无视类型。 对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。
可选参数从 Redis 2.6.12 版本开始,SET 命令的行为可以通过一系列参数来修改:
EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value
PX millisecond:设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value
NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value
XX:只在键已经存在时,才对键进行设置操作。
原来项目中使用分布式锁,整个逻辑:
获取锁
private boolean acquireLock(Jedis j,String lock) throws Exception{
int timeOut = timeoutSeconds*1000;
boolean acquired = false;
long start = System.currentTimeMillis();
int times = 0;
do {
String value = String.valueOf(System.currentTimeMillis() + timeOut + 1);
// 第一个得到这个锁
if (j.setnx(lock, value) == 1) {
logger.info("第一次获取全局锁:{} 成功", lock);
acquired = true;
break;
}
// j.expire(lock, timeoutSeconds); 网络抖动,可能失败
String currentValue = j.get(lock);
// 小于时,可能是上次没有清除,自上次超时后没有别的线程操作过
if (currentValue != null && Long.valueOf(currentValue) < System.currentTimeMillis()) {
// 这是同步操作,只会一个成功
String oldValue = j.getSet(lock, value);
// 别的线程没有赋上值,当前成功得到锁
if (oldValue != null && oldValue.equals(currentValue)) {
acquired = true;
logger.info("获取全局锁:{} 成功,尝试了{}次,经过了{}ms",lock,times,System.currentTimeMillis()-start);
break;
}
}
times++;
Thread.sleep(100);
} while (start + timeOut > System.currentTimeMillis());
if(!acquired){
logger.info("获取全局锁:{} 失败,尝试了{}次",lock,times);
}
return acquired;
}
解锁
private void releaseLock(Jedis j,String lock){
String currentValue = j.get(lock);
if(currentValue != null){
if(System.currentTimeMillis() < Long.valueOf(currentValue) ){
j.del(lock);
logger.info("释放锁{}",lock);
}
}
}
特地从多年前的项目中把这段代码找出来,当年写完,心里还挺美
网上有很多资料也是差不多样的,但事实并不那么完美,甚至是错误的
Long result = jedis.setnx(lockKey, value);
if (result == 1) {
// 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
jedis.expire(lockKey, expireTime);
}
这个问题很明显,setnx与expire不是同一个事务,不俱备原子性;程序崩溃或者网络抖动都会出现死锁问题
当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法,那么虽然最终只有一个客户端可以加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖
这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的
有种错误改进,增加参数传入requestId
public static void releaseLock(Jedis jedis, String lockKey, String requestId) {
// 判断加锁与解锁是不是同一个客户端
if (requestId.equals(jedis.get(lockKey))) {
// 若在此时,这把锁突然不是这个客户端的,则会误解锁
jedis.del(lockKey);
}
}
还是原子性的问题如代码注释,问题在于如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景?答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了
心里认为本来很简单的事,代码大概:
Lock lock = DistributedReentrantLock.newLock("testlock11");//定义testlock11为key的锁,默认可重入锁
if(lock.tryLock()){
try{
xxxxxx
}finally{
lock.unlock(); //释放testlock11为key的锁,释放需要放在finally里,防止出异常导致锁没有及时释放
}
}
为了提高性能,通过redis原子性接口SETNX:
结果为了弥补setnx()与expire()两个接口的原子性问题,引入了一堆问题,外强中干
Redis 2.6.12版本后,增强了set()命令
/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
加锁就一行代码:jedis.set(String key, String value, String nxxx, String expx, int time), 这个set()方法一共有五个入参:
高可用:
/**
* 释放分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)
使用eval()配置lua保证原子性
在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令
为什么需要一个有效时间呢?主要就是防止死锁
客户端需要设置接口访问超时,接口超时时间需要远远小于锁超时时间,比如锁自动释放的时间是10s,那么接口超时大概设置5-50ms
【虽然能解决问题,但时间设置成了难点,微服务中多少接口,而且接口的timeout都是可配置的,不能每次调整接口timeout时,还是考虑一下锁的timeout】
客户端1获得了锁,正准备处理共享资源的时候,发生了Full GC直到锁过期。这样,客户端2又获得了锁,开始处理共享资源。在客户端2处理的时候,客户端1 Full GC完成,也开始处理共享资源,这样就出现了2个客户端都在处理共享资源的情况
引入锁续约机制,也就是获取锁之后,释放锁之前,会定时进行锁续约,比如以3min间隔周期进行锁续约
这样如果应用重启了,最多3min等待时间,不会因为时间太长导致的死锁问题,也不会因为时间太短导致被其他线程抢占的问题,也就是锁分布式锁不需要设置过期时间,过期时间对于这个锁来说是滑动的
虾总给了总结性阐述:
首先启动Daemon线程,一直循环检测所有的分布式key,异步递延分布锁的过期时间,只要在处理业务逻辑,就递延分布锁过期时间3min。 每次添加分布式锁key,同时会生成一个uuid token,定义一个ConcurrentHashMap构造一个全局map维护所有的分布式key,上面Daemon线程会遍历这个map,每次解锁需要比对这个token,token一致才能解锁。 这样以来如果应用重启了,最多会有3min等待时间,不会导致时间太长导致的死锁问题,也不会因为时间太短导致的被其他线程抢占的问题,也就是锁分布式锁不需要设置过期时间,过期时间对于这个锁来说是滑动的
跟随虾总思路,找到了一个开源组件:Redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。
相对于平时使用的jedis,redission进行比较高的抽象
redission中的lock主要是RLock接口,继承的juc的Lock接口
public interface RLock extends Lock, RExpirable, RLockAsync
先看lock(),有两种形式,一个不带leaseTime,一个带leaseTime
public void lock() ;
public void lock(long leaseTime, TimeUnit unit) ;
边看源码,边解释
两个方法共用了lockInterruptibly()
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
尝试获取锁tryAcquire里面会用到两个核心方法tryAcquireAsync(),tryLockInnerAsync()
@Override
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel();
}
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
@Override
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
Boolean opStatus = future.getNow();
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});
return result;
}
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = expirationRenewalMap.get(getEntryName());
if (task != null && (threadId == null || task.getThreadId() == threadId)) {
expirationRenewalMap.remove(getEntryName());
task.getTimeout().cancel();
}
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
使用cluster时
一个场景:A在向主机1请求到锁成功后,主机1宕机了。现在从机1a变成了主机。但是数据没有同步,从机1a是没有A的锁的。那么B又可以获得一个锁。这样就会造成数据错误。
redlock主要思想就是做数据冗余。建立5台独立的集群,当我们发送一个数据的时候,要保证3台(n/2+1)以上的机器接受成功才算成功,否则重试或报错
redlock实现会更复杂,但从他的算法上看,有zk选举的味道。对于更高可用分布锁,可以借助zk本身特性去实现
对于锁,主要考虑性能与安全,即要保持锁的活跃性,又得保证锁的安全性
分布式锁,除了以上两点,还要考虑实现时的三要素
对于redission,对于锁部分的源码,还有很多的内容,很多的细节需要挖掘,此篇就不写了,太长。
后面再结合JUC,写篇更详细的源码分析
Redis分布式锁的正确实现方式
redission