我们常常将缓存作为分布式锁的解决方案,但是却不能单纯的判断某个 key 是否存在 来作为锁的获得依据,因为无论是 exists 和 get 命名都不是线程安全的,都无法保证只有一个线程可以获得锁,存在线程争抢,可能会有多个线程同时拿到锁的情况(经典的 Redis “读后写”的问题)。
@Component
public class LockClient {
private StringRedisTemplate stringRedisTemplate;
private ValueOperations<String, String> valueOperations;
@Autowired
public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
this.valueOperations = stringRedisTemplate.opsForValue();
}
public void lockIncr() {
Long lockIncr = valueOperations.increment("lockIncr", 1);
// 说明拿到了锁
if (lockIncr == 1) {
// 业务操作
}
}
}
上面的锁实现方式,我们对资源做了隔离,保证只有唯一线程可以拿到资源并执行操作。但是如果资源并不是唯一线程执行的呢?存在多个线程争抢的情况下呢?
public void lockSetnx() {
String lock = "lockSetnx";
long millis = System.currentTimeMillis();
long timeout = millis + 3000L + 1;
try {
while (true) {
boolean setnx = valueOperations.setIfAbsent(lock, timeout + "");
if (setnx == true) {
break;
}
String oldTimeout = valueOperations.get(lock);
// 这一步是为了解决客户端异常宕机,锁没有被正常释放的时候。
// 当 p1、p2 同时执行到这里,发现锁的时间过期了。p1、p2 同时执行 getSet 命令。
// 假设 p1 先执行成功了,那么 p1 得到的值就是原来锁的过期时间(可以符合下面的判断式),表示争抢锁成功。
// 假设 p2 后执行成功了,那么 p2 得到的值就是 p1 set 进去的值(不会符合下面的表达式),表示争抢锁失败。
String oldValue = valueOperations.getAndSet(lock, timeout + "");
if (millis > Long.valueOf(oldTimeout) && millis > Long.valueOf(oldValue)) {
break;
}
// 休眠 100 毫秒,再去争抢锁
Thread.sleep(100);
}
// 执行业务代码
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (millis < timeout) {
stringRedisTemplate.delete(lock);
}
}
}
zookeeper,天生的分布式协调工具,生来就是为了解决各种分布式的难题,比如分布式锁、分布式计数器、分布式队列等等。 zookeeper 分布式锁,如果自己实现的话,大抵的实现方式如下:
幸运的是,zookeeper recipes 客户端为我们提供了多种分布式锁实现:
zookeeper recipes 锁的简单使用:
public InterProcessMutex interProcessMutex(String lockPath) {
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeper, new ExponentialBackoffRetry(1000, 3));
// 启用命名空间,做微服务间隔离
client.usingNamespace(namespace);
client.start();
return new InterProcessMutex(client, lockPath);
}
public void lockUse() {
InterProcessMutex interProcessMutex = interProcessMutex("/lockpath");
try {
// 获取锁
if (interProcessMutex.acquire(100, TimeUnit.MILLISECONDS)) {
// 执行业务代码
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
try {
interProcessMutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}