今天来介绍下的是分布式锁的实现。
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的JavaAPI并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
在一个应用中,通常存在着不同的模式,这几种分布式锁需要结合使用,将一种分布式锁不加区别的应用到所有场景将导致较差的效果。因此,我们分别讨论这三种方式的具体实现,以及容易出现的误用。
Oracle乐观锁
在oracle中实现乐观锁很简单,只要在相应的数据上增加一个版本控制,例如version,每次读出来的时候,把该字段也读出来,当写回去时,把该字段加1,提交之前跟数据库的该字段比较一次,如果比数据库的值大的话,就允许保存,否则不允许保存。如下所示:
会话1操作如下:
select id, balance, version from account where id="1";
查询结果:id=1, balance=1000, version=1
update account
set balance=balance+100, version=version+1
where id="1" and version=1;
select id, balance, version from account where id="1";
查询结果:id=1, balance=1100, version=2
会话2操作如下:
select id, balance, version from account where id="1";
查询结果:id=1, balance=1000, version=1
update account
set balance=balance-50, version=version+1
where id="1" and version=1;
select id, balance, version from account where id="1";
查询结果:id=1, balance=1100, version=2
会话1已修改成功,实际account.balance=1100、account.version=2,会话2也将版本号加1(version=2)试图向数据库提交数据(balance=950),但此时比对数据库记录版本时发现,操作员B提交的数据版本号为2,数据库记录当前版本也为2,不满足提交版本必须大于记录当前版本才能执行更新的乐观锁策略,因此会话2的提交不会生效。
Redis分布式锁
使用Redis使用分布锁时,主要有两种方式,一种是基于原生的Redis客户端,使用setnx、getset以及expire等实现;另一种方式使用Redisson,它们就像Zookeeper的原生API与ApacheCurator。早期开发人员一般使用Redis客户端自己实现,这两年很多用户采用了Redission库。
1.基于Jedis的分布式锁
首先通过Maven引入Jedis开源组件,在pom.xml文件加入如下依赖:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
实现如下:
public class RedisUtil {
private static finalString LOCK_SUCCESS = "OK";
private static finalString SET_IF_NOT_EXIST = "NX";
private static finalString SET_WITH_EXPIRE_TIME = "PX";
/**
* 尝试获取分布式锁。加锁代码满足我们可靠性里描述的三个条件。首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁。最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public booleantryGetDistributedLock(Jedis jedis, String lockKey, String requestId, intexpireTime) {
try (Jedis jedis = jedisPool.getResource()){
String result = jedis.set(namespace +lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
private staticfinal Long RELEASE_SUCCESS = 1L;
/**
* 释放分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static booleanreleaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
String script ="if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
try (Jedis jedis =jedisPool.getResource()) {
Object result = jedis.eval(script,Collections.singletonList(namespace + lockKey),Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
}
2.基于Redisson的分布式锁
Redisson(https://redisson.org/)是Redis官网推荐的Java语言实现分布式锁的项目,支持各种部署架构的redis,包括单机、集群(Cluster)、哨兵(Sentinel)以及主从(Master/Slave),广泛被各大厂使用。当然Redisson的功能远不止分布式锁,还包括其他一些分布式结构,但是本节主要讨论其分布式锁实现。Redisson实现了JDK中的Lock接口,用法和JDK的锁很类似,只不过Redssion的锁是分布式实现。其伪代码如下:
RLock lock = redisson.getLock("MyLockName");
lock.lock();
try {
// do sth.
} finally {
lock.unlock();
}
首先引入Redisson库依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.2.3</version>
</dependency>
连接管理器实现如下:
// Redission连接管理器
public class RedissonManager {
private static finalString RAtomicName = "genId_";
private static Configconfig = new Config();
private static Redissonredisson = null;
public static void init(){
try {
config.useClusterServers() //这是用的集群server
.setScanInterval(2000) //设置集群状态扫描时间
.setMasterConnectionPoolSize(10000) //设置连接数
.setSlaveConnectionPoolSize(10000)
.addNodeAddress("127.0.0.1:6379","127.0.0.1:6380");
redisson =Redisson.create(config);
}catch (Exception e){
e.printStackTrace();
}
}
public static RedissongetRedisson(){
return redisson;
}
}
基于Redisson的分布式锁工具类实现:
public class DistributedRedisLock {
private static Redissonredisson = RedissonManager.getRedisson();
private static finalString LOCK_TITLE = "redisLock_";
public static voidacquire(String lockName){
String key =LOCK_TITLE + lockName;
RLock mylock =redisson.getLock(key);
mylock.lock(2,TimeUnit.MINUTES); //lock提供带timeout参数,timeout结束强制解锁,防止死锁
System.err.println("======lock======"+Thread.currentThread().getName());
}
public static voidrelease(String lockName){
String key =LOCK_TITLE + lockName;
RLock mylock =redisson.getLock(key);
mylock.unlock();
System.err.println("======unlock======"+Thread.currentThread().getName());
}
// 测试
private static void redisLock(){
RedissonManager.init(); //初始化
for (int i = 0; i <100; i++) {
Thread t = newThread(new Runnable() {
@Override
public voidrun() {
try {
Stringkey = "test123";
DistributedRedisLock.acquire(key);
Thread.sleep(1000); //获得锁之后可以进行相应的处理
System.err.println("======获得锁后进行相应的操作======");
DistributedRedisLock.release(key);
System.err.println("=============================");
} catch(Exception e) {
e.printStackTrace();
}
}
});
t.start();
}
}
}
Zookeeper分布式锁
有序性是Zookeeper中非常重要的一个特性,所有的更新都是全局有序,每个更新都有唯一的时间戳,该时间戳称为zxid(Zookeeper Transaction Id)。读请求只会相对于更新有序,也就是读请求的返回结果中会带有这个最新的zxid。在正式讨论如何使用Zookeeper实现分布式锁前,我们需要先了解下Zookeeper中关于节点的几个特性:
l 有序节点:假如当前有一个父节点为/hs/lock,我们可以在这个父节点下面创建子节点;Zookeeper提供了一个可选的有序特性,例如我们可以创建子节点/hs/lock/node-并且指明有序,那么Zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号,也就是说如果是第一个创建的子节点,那么生成的子节点为/hs/lock/node-0000000000,下一个节点则为/hs/lock/node-0000000001,依次类推。
l 临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,Zookeeper会自动删除该节点。
l 事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,Zookeeper会通知客户端。当前Zookeeper有如下四种事件:节点创建、节点删除、节点数据修改、子节点变更。
这几个特性是Zookeeper的关键特性,也是实现分布式锁的关键,读者应确保好好领会。下面描述使用Zookeeper实现分布式锁的算法流程,假设锁空间的根节点为/hs/lock:
(1)客户端连接Zookeeper,并在/hs/lock下创建临时的且有序的子节点,第一个客户端对应的子节点为/hs/lock-0000000000,第二个为/hs/lock-0000000001,以此类推。
(2)客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通知后重复此步骤直至获得锁。
(3)执行业务代码。
(4)完成业务流程后,删除对应的子节点释放锁。
虽然Zookeeper原生客户端暴露的API已经比较简洁,但是友好性并不好,实现各种实际功能比较繁琐,分布式锁也不例外,所以实际中一般直接使用curator开源项目提供的Zookeeper分布式锁实现。如下所示,首先引入依赖:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
<!--4.0.0原生不兼容zk 3.4, 必须进行兼容性处理,否则会报KeeperErrorCode = Unimplemented异常-->
<exclusions>
<exclusion>
<groupId>org.apache.Zookeeper</groupId>
<artifactId>Zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
下面实现基于Curator的分布式锁工具类:
package chapter6.sec63;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorDLock {
public static voidmain(String[] args) throws Exception {
// 创建Zookeeper的客户端
RetryPolicyretryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFrameworkclient = CuratorFrameworkFactory.newClient("9.20.38.223:2181",retryPolicy);
client.start();
// 创建分布式锁, 锁空间的根节点路径为/hs/lock
InterProcessMutexmutex = new InterProcessMutex(client, "/hs/lock");
mutex.acquire();
// 获得了锁, 进行业务流程
System.out.println("Entermutex");
// 完成业务流程, 释放锁
mutex.release();
// 关闭客户端
client.close();
}
}