高并发时,同步调用应该去考量锁的性能损耗。能用无锁数据结构,就不要用锁;能锁区块,就不要锁整个方法体;能用对象锁,就不要用类锁。
分布式锁的目的:
保证同一时间只有一个客户端可以对共享资源进行操作。
分布式锁特点:
分布式锁服务,你需要考虑下面几个设计:
二、实现方式
2.1 单实例 Redis锁
Redis 2.6.12 版本开始支持
SET resource_name my_random_value NX PX 30000
至于解锁,为了防止客户端1 获得的锁,被客户端2 给释放,采用下面的Lua脚本来释放锁
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
在执行这段LUA脚本的时候,KEYS[1]的值为resource_name,ARGV[1]的值为my_random_value。原理就是先获取锁对应的value值,保证和客户端传进去的my_random_value值相等,这样就能避免自己的锁被其他人释放。另外,采取Lua脚本操作保证了原子性。
例如,下面的例子演示了不区分 Client 会导致的错误
通过执行上面脚本的方式释放锁,Client 的解锁操作只会解锁自己曾经加锁的资源,所以是安全的。
特意注意:由于释放锁涉及多个redis操作,考虑到并发问题,采用lua脚本打包执行。
2.2 新的分布式锁算法 Redlock
如果 Redis节点宕机了,服务不可用,所有客户端都无法获得锁。为了提高可用性,我们可以给这个Redis节点挂一个Slave,当Master节点挂了,系统自动切到Slave上(failover)。但由于主从复制(replication)是异步的,数据可能会丢失,导致丧失锁的安全性。下面会介绍一种升级版:Redis 的官方文档
Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。
算法原理:
redisson框架对 Redlock 算法封装。pom依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.3.2</version>
</dependency>
详细的获取锁、释放锁的API方法可以参考文章 链接
2.3 基于curator的zookeeper分布式锁(推荐这种方式)
基于ZooKeeper的锁和基于Redis的锁比较:
只能通过不断的轮询尝试获取锁
。而ZooKeeper支持watch
机制,获取锁失败之后等待锁释放的事件,这让客户端对锁的使用更加灵活。Maven依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
提供方法:
public class ZkDistributedLock {
// zookeeper地址
private String zkAddr;
// session超时时间
private int sessionTimeOutMs;
// zk名称空间
private String nameSpace;
private CuratorFramework cf;
private final ThreadLocal<Pair<InterProcessMutex, String>> threadLocal = new ThreadLocal<Pair<InterProcessMutex, String>>();
public ZkDistributedLock(String zkAddr, int sessionTimeOutMs, String nameSpace) {
this.zkAddr = zkAddr;
this.sessionTimeOutMs = sessionTimeOutMs;
this.nameSpace = nameSpace;
//1. 重试策略:重试时间为0s 重试3次 [默认重试策略:无需等待一直抢,抢3次]
RetryPolicy retryPolicy = new ExponentialBackoffRetry(0, 3);
//2. 通过工厂创建连接
cf = CuratorFrameworkFactory.builder()
.connectString(this.zkAddr)
.sessionTimeoutMs(this.sessionTimeOutMs)
.retryPolicy(retryPolicy)
.namespace(this.nameSpace)
.build();
//3. 开启连接
cf.start();
}
// 获取分布式锁
public boolean acquire(String lockKey) {
try {
InterProcessMutex lock = new InterProcessMutex(cf, "/" + lockKey);
lock.acquire();
threadLocal.set(new Pair<InterProcessMutex, String>(lock, lockKey));
return true;
} catch (Exception e) {
return false;
}
}
// 获取分布式锁(支持等待时间)
public boolean acquire(String lockKey, long time, TimeUnit unit) {
try {
InterProcessMutex lock = new InterProcessMutex(cf, "/" + lockKey);
if (lock.acquire(time, unit)) {
threadLocal.set(new Pair<InterProcessMutex, String>(lock, lockKey));
return true;
}
return false;
} catch (Exception e) {
return false;
}
}
// 释放锁
public void release() {
String lockKey = null;
try {
// 前线程中获取到pair 如果没有获取到锁 没有必要做释放
Pair<InterProcessMutex, String> pair = threadLocal.get();
if (pair == null) {
return;
}
InterProcessMutex lock = pair.getKey();
lockKey = pair.getValue();
if (lock == null) {
return;
}
if (!lock.isAcquiredInThisProcess()) {
return;
}
lock.release();
} catch (Exception e) {
} finally {
threadLocal.remove();
}
}
}