最近看到好多博主都在推分布式锁,实现方式很多,基于db、redis、zookeeper。zookeeper方式实现起来比较繁琐,这里我们就谈谈基于redis实现分布式锁的正确实现方式。
在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等。大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。 其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。
使用Redis实现分布式锁,有两个重要函数需要介绍。
SETNX key value
GETSET key value
GET key
DEL key [KEY …]
兵贵精,不在多。分布式锁,我们就依靠这四个命令。但在具体实现,还有很多细节,需要仔细斟酌,因为在分布式并发多进程中,任何一点出现差错,都会导致死锁,hold住所有进程。
SETNX 可以直接加锁操作,比如说对某个关键词foo加锁,客户端可以尝试 SETNX foo.lock <current unix time>
。
DEL foo.lock
命令来释放锁。理想是美好的,现实是残酷的。仅仅使用SETNX加锁带有竞争条件的,在某些特定的情况会造成死锁错误。
在上面的处理方式中,如果获取锁的客户端执行时间过长,进程被kill掉,或者因为其他异常崩溃,导致无法释放锁,就会造成死锁。所以,需要对加锁要做时效性检测。
因此,我们在加锁时,把当前时间戳作为value存入此锁中,通过当前时间戳和redis中的时间戳进行对比,如果超过一定差值,认为锁已经时效,防止锁无限期的锁下去。
但是,在大并发情况,如果同时检测锁失效,并简单粗暴的删除死锁,再通过SETNX上锁,可能会导致竞争条件的产生,即多个客户端同时获取锁。
情景描述如下:
此时C2和C3都获取了锁,产生竞争条件,如果在更高并发的情况,可能会有更多客户端获取锁。
所以,DEL锁的操作,不能直接使用在锁超时的情况下,幸好我们有GETSET方法,假设我们现在有另外一个客户端C4,看看如何使用GETSET方式,避免这种情况产生。
GETSET foo.lock
并得到foo.lock中老的时间戳T2。我们看到foo.lock的value值为时间戳,所以要在多客户端情况下,保证锁有效,一定要同步各服务器的时间。如果各服务器间,时间有差异,时间不一致的客户端,在判断锁超时,就会出现偏差,从而产生竞争条件。锁的超时与否,严格依赖时间戳。
现在唯一的问题是,C4设置foo.lock的新时间戳,是否会对C5获取得锁产生影响?
其实我们可以看到C4和C5只有在调用GET命令获得foo.lock的时间戳,通过比对时间戳,发现锁超时后,几乎同时调用GETSET方式获取锁,执行的时间差值极小,并且写入foo.lock中的都是有效时间戳,所以对锁并没有影响。
为了让这个锁更加强壮,获取锁的客户端,应该在调用关键业务时,再次调用GET方法获取T1,和写入的T0时间戳进行对比,以免锁因其他情况被执行DEL意外解开而不知。但是如果遇到上面描述得问题,则T0则会与T1不一致,当然差别一般会很小。这就是锁覆盖问题。
锁覆盖会导致什么问题呢?
当客户端的锁过期时间被覆盖,会造成锁不具有标识性,会造成客户端无法释放锁(客户端只能释放明确自己持有的锁)。
两种逻辑貌似都是OK,但是从逻辑处理上来说,当GET返回nil,表示锁是被删除的,而不是超时,应该走SETNX逻辑加锁。
对于"第二种走超时逻辑"是否会造成死锁,尚不清楚,不过推荐采用第一种方式。
前提:假设C4客户端获取锁后由于异常退出等原因未正常释放锁,导致锁超时。此时,C1、C2和C3客户端同时请求获取锁。C1、C2和C3客户端调用GET接口,C1返回T1,此时C3网络情况更好,快速进入获取锁,并执行DEL删除锁,C2返回T2(nil)。C1进入超时处理逻辑。C2面临上面提到「GET返回nil时应该走哪种逻辑?」的两种选择:1. 也进入超时处理逻辑;2. 继续循环走setnx逻辑(推荐);
至于为什么会出现这种情况?就如上面设想的场景那样,多客户端时,每个客户端连接redis后,发出的命令并不是连续的,导致从单客户端看到的好像连续的命令,到redis server后,这两条命令之间可能已经插入大量的其他客户端发出的命令,比如DEL,SETNX等。
正确的处理方式就是GETSET返回nil时,获取锁成功。
https://github.com/HuTu92/distributed-lock
package com.github.hutu92.concurrent.locks;
import com.alibaba.fastjson.JSON;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* Created by liuchunlong on 2018/8/31.
* <p>
* 基于redis的分布式锁 v1
*
* 需要客户端时间同步
*/
public class DistributedLock {
private static final long RETRY_BARRIER = 3 * 1000; // 请求锁重试屏障,单位毫秒
private final JedisPool jedisPool; // redis连接池
private final String lockKey; // lock Key
private final long lockExpiryInNanos; // 锁的过期时长,单位纳秒
private static final ThreadLocal<Lock> lockThreadLocal = new ThreadLocal<Lock>();
/**
* 构造方法
*
* @param jedisPool redis连接池
* @param lockKey 锁的Key
* @param lockExpiryInMillis 锁的过期时长,单位毫秒
*/
public DistributedLock(JedisPool jedisPool, String lockKey, long lockExpiryInMillis) {
this.jedisPool = jedisPool;
this.lockKey = lockKey;
this.lockExpiryInNanos = lockExpiryInMillis * 1000;
}
/**
* 构造方法
* <p>
* 使用锁默认的过期时长Integer.MAX_VALUE,即锁永远不会过期
*
* @param jedisPool redis连接池
* @param lockKey 锁的Key
*/
public DistributedLock(JedisPool jedisPool, String lockKey) {
this(jedisPool, lockKey, Integer.MAX_VALUE);
}
/**
* 获取锁在redis中的Key标记
*
* @return locks key
*/
public String getLockKey() {
return this.lockKey;
}
/**
* 锁的过期时长
*
* @return
*/
public long getLockExpiryInNanos() {
return lockExpiryInNanos;
}
/**
* 请求分布式锁,不会阻塞,直接返回
*
* @param jedis redis 连接
* @return 成功获取锁返回true, 否则返回false
*/
private boolean tryAcquire(Jedis jedis) {
final Lock newLock = new Lock(System.nanoTime() + this.lockExpiryInNanos);
/**
* 将新锁(newLock)写入redis中。如果成功写入,redis中不存在锁,获取锁成功;否则,redis中已存在锁,获取锁失败;
*/
if (jedis.setnx(this.lockKey, newLock.toString()) == 1) {
lockThreadLocal.set(newLock);
return true;
}
/**
* 至此,说明redis中已存在锁,获取锁失败,则需要进行如下操作:
* 1. 判断redis中已存在的锁是否过期,如果过期则直接获取锁;
* 2. 否则,获取锁失败;
*/
final String currentLockValue = jedis.get(lockKey);
// 特别的,当jedis.get()获取已存在的锁currentLockValue为空时,应该重新SETNX
if (currentLockValue == null || currentLockValue.length() == 0) {
tryAcquire(jedis);
}
final Lock currentLock = Lock.fromJson(currentLockValue); // redis中已存在的锁
// 如果redis中已存在的锁已超时,则重新获取锁
if (isExpired(currentLock)) {
String originLockValue = jedis.getSet(lockKey, newLock.toString());
/**
* 这里还有个前置条件:
* 会对已存在的锁进行校验,jedis.get()和jedis.getSet()获取的锁必须是同一锁,重新获取锁才成功
*/
// 特别的,当jedis.getSet()获取已存在的锁originLockValue为空时,则认定获取锁成功
if (originLockValue == null || originLockValue.length() == 0) {
lockThreadLocal.set(newLock);
return true;
}
if (originLockValue.equals(currentLockValue)) {
lockThreadLocal.set(newLock);
return true;
}
}
return false;
}
/**
* 请求分布式锁,不会阻塞,直接返回
*
* @return 成功获取锁返回true, 否则返回false
*/
public boolean tryAcquire() {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
return tryAcquire(jedis);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
/**
* 超时请求分布式锁,会阻塞
*
* 采用"自旋获取锁"的方式,直至获取锁成功或者请求锁超时
*
* @param acquireTimeoutInMillis 锁的请求超时时长
* @return
*/
public boolean acquire(long acquireTimeoutInMillis) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
long acquireTime = System.currentTimeMillis();
// 锁的请求到期时间
long expiryTime = System.currentTimeMillis() + acquireTimeoutInMillis;
while (expiryTime >= System.currentTimeMillis()) {
boolean result = tryAcquire(jedis);
if (result) { // 获取锁成功直接返回,否则循环重试
return true;
}
if ((System.currentTimeMillis() - acquireTime) > RETRY_BARRIER) {
Thread.yield();
}
}
} finally {
if (jedis != null) {
jedis.close();
}
}
return false;
}
/**
* 释放锁
*/
public void release() {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
release(jedis);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
/**
* 释放锁
*
* @param jedis
*/
private void release(Jedis jedis) {
Lock currlock = lockThreadLocal.get();
if (currlock != null) {
final String currentLockValue = jedis.get(lockKey);
if (currentLockValue != null && currentLockValue.length() != 0) {
final Lock currentLock = Lock.fromJson(currentLockValue); // redis中已存在的锁
if (currlock.equals(currentLock)) {
lockThreadLocal.remove();
jedis.del(lockKey);
}
}
}
}
/**
* 判断当前线程是否持有锁
*
* 未持有锁或者锁超时,返回false
*
* @return
*/
public boolean isLocked() {
Lock currlock = lockThreadLocal.get();
// 如果当前线程保存的lock不为null,并且未超时,则当前线程必然持有锁,锁未被意外释放
return currlock != null && !currlock.isExpired();
}
/**
* 判断指定的lock是否是当前线程持有的锁
*
* @return
*/
boolean isMine(final Lock lock) {
Lock currlock = lockThreadLocal.get();
return currlock != null && currlock.equals(lock);
}
/**
* 判断锁是否超时
*
* @param lock
* @return
*/
boolean isExpired(final Lock lock) {
return lock.isExpired();
}
/**
* 锁
*/
protected static class Lock {
private long expiryTime; // 锁的过期时间,注意,不是过期时长,单位纳秒
Lock(long expiryTime) {
this.expiryTime = expiryTime;
}
/**
* 解析字符串,根据解析出的过期时间构造Lock
*
* @param json
* @return
*/
static Lock fromJson(String json) {
return JSON.parseObject(json, Lock.class);
}
@Override
public String toString() {
return JSON.toJSONString(this, false);
}
public long getExpiryTime() {
return expiryTime;
}
/**
* 判断锁是否超时,如果锁的过期时间小于当前系统时间,则判定锁超时
*
* @return
*/
boolean isExpired() {
return this.expiryTime < System.nanoTime();
}
@Override
public boolean equals(Object obj) {
return obj != null
&& obj instanceof Lock
&& this.expiryTime == ((Lock) obj).getExpiryTime();
}
}
}
上面存在的锁覆盖问题是不可避免的,还有就是要求客户端时间同步。下面我们进一步优化这一问题。
SET key value [EX seconds] [PX milliseconds] [NX|XX]
SET key value EX second
效果等同于 SETEX key second value
。SET key value PX millisecond
效果等同于 PSETEX key millisecond value
。SET key value NX
效果等同于 SETNX key value
。因为 SET 命令可以通过参数来实现和 SETNX 、 SETEX 和 PSETEX 三个命令的效果,所以将来的 Redis 版本可能会废弃并最终移除 SETNX 、 SETEX 和 PSETEX 这三个命令。
命令 SET resource-name anystring NX EX max-lock-time
是一种在 Redis 中实现锁的简单方法。
客户端执行以上的命令:
设置的过期时间到达之后,锁将自动释放。
可以通过以下修改,让这个锁实现更健壮:
这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。
以下是一个简单的解锁脚本示例:
if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end
package com.github.hutu92;
import com.alibaba.fastjson.JSON;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by liuchunlong on 2018/9/4.
* <p>
* 基于redis的分布式锁 v2
* <p>
* 不需要客户端时间同步
*/
public class DistributedLock {
private static final long RETRY_BARRIER = 600; // 重试屏障,单位毫秒
private static final long INTERVAL_TIMES = 200; // 下一次重试等待,单位毫秒
private final JedisPool jedisPool; // redis连接池
private final String lockKey; // lock Key
private final long lockExpiryInMillis; // 锁的过期时长,单位纳秒
private final ThreadLocal<Lock> lockThreadLocal = new ThreadLocal<Lock>();
/**
* 构造方法
*
* @param jedisPool redis连接池
* @param lockKey 锁的Key
* @param lockExpiryInMillis 锁的过期时长,单位毫秒
*/
public DistributedLock(JedisPool jedisPool, String lockKey, long lockExpiryInMillis) {
this.jedisPool = jedisPool;
this.lockKey = lockKey;
this.lockExpiryInMillis = lockExpiryInMillis;
}
/**
* 构造方法
* <p>
* 使用锁默认的过期时长Integer.MAX_VALUE,即锁永远不会过期
*
* @param jedisPool redis连接池
* @param lockKey 锁的Key
*/
public DistributedLock(JedisPool jedisPool, String lockKey) {
this(jedisPool, lockKey, Integer.MAX_VALUE);
}
/**
* 获取锁在redis中的Key标记
*
* @return locks key
*/
public String getLockKey() {
return this.lockKey;
}
/**
* 锁的过期时长
*
* @return
*/
public long getLockExpiryInMillis() {
return lockExpiryInMillis;
}
/**
* can override
*
* @param jedis
* @return
*/
private String nextUid(Jedis jedis) {
// 可以考虑雪花算法..
return UUID.randomUUID().toString();
}
private synchronized Jedis getClient() {
return jedisPool.getResource();
}
private synchronized void closeClient(Jedis jedis) {
jedis.close();
}
/**
* 请求分布式锁,不会阻塞,直接返回
*
* @param jedis redis 连接
* @return 成功获取锁返回true, 否则返回false
*/
private boolean tryAcquire(Jedis jedis) {
final Lock nLock = new Lock(nextUid(jedis));
String result = jedis.set(this.lockKey, nLock.toString(), "NX", "PX", this.lockExpiryInMillis);
if ("OK".equals(result)) {
lockThreadLocal.set(nLock);
return true;
}
return false;
}
/**
* 请求分布式锁,不会阻塞,直接返回
*
* @return 成功获取锁返回true, 否则返回false
*/
public boolean tryAcquire() {
Jedis jedis = null;
try {
jedis = getClient();
return tryAcquire(jedis);
} finally {
if (jedis != null) {
closeClient(jedis);
}
}
}
/**
* 超时请求分布式锁,会阻塞
*
* 采用"自旋获取锁"的方式,直至获取锁成功或者请求锁超时
*
* @param acquireTimeoutInMillis 锁的请求超时时长
* @return
*/
public boolean acquire(long acquireTimeoutInMillis) throws InterruptedException {
Jedis jedis = null;
try {
jedis = getClient();
long acquireTime = System.currentTimeMillis();
long expiryTime = System.currentTimeMillis() + acquireTimeoutInMillis; // 锁的请求到期时间
while (expiryTime >= System.currentTimeMillis()) {
boolean result = tryAcquire(jedis);
if (result) { // 获取锁成功直接返回,否则循环重试
return true;
}
Thread.sleep(INTERVAL_TIMES);
}
} finally {
if (jedis != null) {
closeClient(jedis);
}
}
return false;
}
/**
* 释放锁
*
* @return
*/
public boolean release() throws InterruptedException {
return release(Integer.MAX_VALUE);
}
/**
* 释放锁
*
* @return
*/
public boolean release(long releaseTimeoutInMillis) throws InterruptedException {
Jedis jedis = null;
try {
jedis = getClient();
return release(jedis, releaseTimeoutInMillis);
} finally {
if (jedis != null) {
closeClient(jedis);
}
}
}
/**
* 释放锁
*
* @param jedis
* @param releaseTimeoutInMillis
* @return
*/
private boolean release(Jedis jedis, long releaseTimeoutInMillis) throws InterruptedException {
Lock cLock = lockThreadLocal.get();
if (cLock == null) {
System.out.println("lock is null!");
}
if (cLock != null) {
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
long releaseTime = System.currentTimeMillis();
long expiryTime = System.currentTimeMillis() + releaseTimeoutInMillis; // 锁的释放到期时间
while (expiryTime >= System.currentTimeMillis()) {
Object result = jedis.eval(luaScript, Collections.singletonList(this.lockKey),
Collections.singletonList(cLock.toString()));
if (((Long) result) == 1L) {
lockThreadLocal.remove();
return true;
}
Thread.sleep(INTERVAL_TIMES);
}
}
return false;
}
/**
* 锁
*/
protected static class Lock {
private String uid; // lock 唯一标识
Lock(String uid) {
this.uid = uid;
}
public String getUid() {
return uid;
}
@Override
public String toString() {
return JSON.toJSONString(this, false);
}
}
}
这里我们使用ab性能测试工具来模拟测试。
由于没有使用队列,对高并发请求进行削峰,所以所有的压力都会被打到redis上。为了测试方便我这里只是本地启动了单机redis,没有做其它的调优配置。
我们并发测试场景是1000个并发请求,总共2000个请求。
ab -n 2000 -c 1000 "localhost:8080/lock/v2/seckill"
上述的地址是一个接口,接口代码如下:
@RestController
@RequestMapping("/lock")
public class LockController {
private static LongAdder longAdder = new LongAdder();
private static Long ACQUIRE_TIMEOUT_IN_MILLIS = (long) Integer.MAX_VALUE;
private static Long stock = 100000L;
private static DistributedLock lock;
static {
longAdder.add(stock);
}
private final JedisPool jedisPool;
@Autowired
public LockController(JedisPool jedisPool) {
this.jedisPool = jedisPool;
lock = new DistributedLock(jedisPool, "seckillV2_" + UUID.randomUUID().toString());
}
@GetMapping("/v2/seckill")
public String seckillV2() throws InterruptedException {
boolean acquireResult = false;
try {
acquireResult = lock.acquire(ACQUIRE_TIMEOUT_IN_MILLIS);
if (!acquireResult) {
return "人太多了,换个姿势操作一下!";
}
if (longAdder.longValue() == 0L) {
return "已抢光!";
}
doSomeThing(jedisPool);
longAdder.decrement();
System.out.println("已抢: " + (stock - longAdder.longValue()) + ", 还剩下: " + longAdder.longValue());
} finally {
if (acquireResult) {
boolean releaseResult = lock.release();
if (!releaseResult) {
System.out.println("释放锁失败!");
}
}
}
return "OK";
}
private void doSomeThing(JedisPool jedisPool) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.incr("already_bought");
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
那么我们这里说的性能调优指的是什么呢?
仔细分析上面的源码你会发现,获取锁的逻辑是循环获取的,再每次循环之间,应该怎么去处理?如果不做任何处理,直接继续下一个循环,表面上看能够及时的获取锁,但这会给redis更大的压力,如果redis扛不住,到最后只会适得其反;而如果sleep等待,那么等待多久呢?等待久了,锁的获取和释放就会不及时;使用yield如何?等等
if ((System.currentTimeMillis() - acquireTime) > RETRY_BARRIER) {
Thread.yield();
}
请求获取锁的前600毫秒内直接循环重试,如果超过600毫秒还未获取到锁则每次循环都将线程推迟到下一个时间片执行。
carbon
主要参数说明:
if ((System.currentTimeMillis() - acquireTime) > RETRY_BARRIER) {
Thread.sleep(INTERVAL_TIMES);
} else {
Thread.yield();
}
请求获取锁的前600毫秒内每次循环重试都先将线程推迟到下一个时间片,如果超过600毫秒还未获取到锁则每次循环都将线程休眠200毫秒。
carbon -1-
很明显,出错率降低了很多,每个请求的耗时也减少了一半,这是因为,No1中在600毫秒内的直接循环重试,会产生很多意义的请求,给redis造成了巨大的压力,无法响应请求。
Thread.sleep(INTERVAL_TIMES);
请求获取锁的每次循环重试都将线程休眠200毫秒。
carbon -2-
Thread.sleep(INTERVAL_TIMES * 10);
请求获取锁的每次循环重试都将线程休眠2秒。
carbon -3-
很明显,休眠时间过长,会使部分线程请求锁的时间变长,不能够及时获取到锁。
Thread.yield();
请求获取锁的每次循环重试都将线程推迟到下一个时间片执行。
carbon -4-
总的来说,No2与No3表现的都还可以。但是No2使用了Thread.yield();
也会给redis造成压力,我可以对比下两者的 Percentage of the requests served within a certain time (ms) 数据。可以看到No3的90%以下请求的用户平均时间要明显低于No2的。所以最终我们选择No3策略。
当然你也可以根据你的redis的QPS自行调整策略。