前面介绍了 Zookeeper 集群 ZAB 协议、配置中心、注册中心、数据与存储、会话与事务管理等相关的知识点,今天我将详细的为大家介绍 zookeeper 分布式锁相关知识,希望大家能够从中收获多多!如有帮助,请点在看、转发支持一波!!!
在平时我们对锁的使用,在针对单个服务,我们可以用 Java 自带的一些锁来实现,资源的顺序访问,但是随着业务的发展,现在基本上公司的服务都是多个,单纯的 Lock或者 Synchronize 只能解决单个JVM线程的问题,那么针对于单个服务的 Java 的锁是无法满足我们业务的需要的,为了解决多个服务跨服务访问共享资源,于是就有了分布锁,分布式锁产生的原因就是集群。
在分布式系统中,多个进程或节点可能需要同时访问共享资源。为了确保数据一致性和并发控制,需要使用分布式锁来协调这些进程或节点之间的访问。分布式锁可以让每个进程或节点按照特定的规则访问共享资源,从而避免冲突和竞争条件的发生。
下图就是一个分布式锁的常见应用案例。
分布式锁的实现方式主要以(ZooKeeper、Reids、Mysql)这三种为主。
今天我们主要讲解的是使用 ZooKeeper来实现分布式锁,ZooKeeper的应用场景主要包含这几个方面:
ZooKeeper实现分布式锁,主要是得益于ZooKeeper 保证了数据的强一致性,锁的服务可以分为两大类:
保持独占所有试图来获取当前锁的客户端,最终有且只有一个能够成功得到当前锁的钥匙,通常我们会把 ZooKeeper 上的节点(ZNode)看做一把锁,通过 create 临时节点的方式来实现,当多个客户端都去创建一把锁的时候,那么只有成功创建了那个客户端才能拥有这把锁。
控制时序所有试图获取锁的客户端,都是被顺序执行,只是会有一个序号(zxid),我们会有一个节点,例如:/testLock,所有临时节点都在这个下面去创建,ZK的父节点(/testLock) 维持了一个序号,这个是ZK自带的属性,他保证了子节点创建的时序性,从而也形成了每个客户端的一个全局时序。
ZooKeeper的分布式锁是基于ZooKeeper提供的有序节点(Sequential Nodes)和 Watch 机制实现的。具体实现步骤如下:
因为ZooKeeper的有序节点是按照创建的顺序排序的,所以可以通过监听前一个节点的变化来实现获取锁。当一个进程或节点需要获取锁时,它会在ZooKeeper上创建一个有序节点,并获取所有有序节点中的最小值。如果当前节点是最小值,则表示该进程或节点已经获取到锁;否则,该进程或节点需要监听前一个节点的变化,等待前一个节点释放锁后再次尝试获取锁。
ZooKeeper的分布式锁具有以下优点:
但是,ZooKeeper的分布式锁也存在一些局限性:
使用ZooKeeper实现分布式锁的基本流程如下:
/locks/lock_node/lock_000000001
,同时设置watcher事件,监控它的前一个节点。需要注意的是,分布式锁的实现还需要处理以下问题:
因此,在实现分布式锁时,需要考虑锁的可靠性
、高效性
和容错性
,并对异常情况进行处理,以确保锁的正确性和系统的稳定性。
另外 ZooKeeper 还提供了一种基于临时节点的分布式锁机制,这种锁被称为“短暂节点锁”。使用短暂节点锁时,每个客户端进程会在 ZooKeeper 上创建一个临时节点,并在该节点上注册一个 Watcher 来监听该节点。当客户端进程需要获取锁时,它会在指定的 ZooKeeper 节点下创建一个短暂节点。如果该节点的序号是当前所有节点中最小的,则该客户端进程获得锁;否则,该进程需要等待,直到 Watcher 监听到节点被删除为止。
短暂节点锁的优点是它不会出现羊群效应,而且当进程失去锁时,它所创建的短暂节点会被自动删除,这可以有效减少ZooKeeper上的数据量。不过,它的缺点是每个客户端都需要创建一个短暂节点,如果客户端数量很多,ZooKeeper上的节点数量可能会很快增加,从而导致性能下降。
ZooKeeper的分布式锁机制可以通过不同的实现方式来满足不同的需求。开发者需要根据实际情况选择适合自己的锁实现方式,以实现高效、可靠的分布式系统。其中包含了分布式锁的实现。使用ZooKeeper实现分布式锁可以避免多个节点同时操作共享资源的问题,确保数据的一致性和可靠性。
在ZooKeeper中,分布式锁的实现基于临时节点和Watch机制,同时需要实现两个基本操作:加锁
和释放锁
。 具体实现方法有两种:
无论哪种实现方法,都需要处理锁竞争的情况和节点异常退出的情况,以确保锁的正确性和可靠性。分布式锁的实现需要考虑多个因素,包括锁的粒度、锁的持有时间、锁的竞争方式等,需要根据具体应用场景进行调整和优化。
由于zookeeper获取链接是一个耗时过程,这里可以在项目启动时,初始化链接,并且只初始化一次。借助于spring特性,代码实现如下:
@Component
public class zkClient {
private static final String connectString = "192.168.107.135";
private static final String ROOT_PATH = "/distributed";
private ZooKeeper zooKeeper;
@PostConstruct
public void init() throws IOException {
this.zooKeeper = new ZooKeeper(connectString, 30000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("zookeeper 获取链接成功");
}
});
//创建分布式锁根节点
try {
if (this.zooKeeper.exists(ROOT_PATH, false) == null) {
this.zooKeeper.create(ROOT_PATH, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@PreDestroy
public void destroy() {
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 初始化分布式对象方法
*/
public ZkDistributedLock getZkDistributedLock(String lockname){
return new ZkDistributedLock(zooKeeper,lockname);
}
}
代码落地
public class ZkDistributedLock {
public static final String ROOT_PATH = "/distribute";
private String path;
private ZooKeeper zooKeeper;
public ZkDistributedLock(ZooKeeper zooKeeper, String lockname) {
this.zooKeeper = zooKeeper;
this.path = ROOT_PATH + "/" + lockname;
}
public void lock() {
try {
zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(200);
lock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void unlock(){
try {
this.zooKeeper.delete(path,0);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
改造StockService的checkAndLock方法:
@Autowired
private zkClient client;
public void checkAndLock() {
// 加锁,获取锁失败重试
ZkDistributedLock lock = this.client.getZkDistributedLock("lock");
lock.lock();
// 先查询库存是否充足
Stock stock = this.stockMapper.selectById(1L);
// 再减库存
if (stock != null && stock.getCount() > 0) {
stock.setCount(stock.getCount() - 1);
this.stockMapper.updateById(stock);
}
lock.unlock();
}
性能一般,mysql数据库的库存余量为0(注意:所有测试之前都要先修改库存量为5000)。
接下来首先来提高性能。
基本实现中由于无限自旋影响性能:
试想:每个请求要想正常的执行完成,最终都是要创建节点,如果能够避免争抢必然可以提高性能。这里借助于zk的临时序列化节点,实现分布式锁:
public class ZkDistributedLock {
public static final String ROOT_PATH = "/distribute";
private String path;
private ZooKeeper zooKeeper;
public ZkDistributedLock(ZooKeeper zooKeeper, String lockname) {
this.zooKeeper = zooKeeper;
try {
this.path = zooKeeper.create(ROOT_PATH + "/" + lockname + "_",
null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void lock() {
String preNode = getpreNode(path);
//如果该节点没有前一个节点,说明该节点是最小的节点
if (StringUtils.isEmpty(preNode)) {
return;
}
//重新检查是否获取到锁
try {
Thread.sleep(20);
lock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取指定节点的前节点
*
* @param path
* @return
*/
private String getpreNode(String path) {
//获取当前节点的序列化序号
Long curSerial = Long.valueOf(StringUtil.substringAfter(path, '_'));
//获取根路径下的所有序列化子节点
try {
List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);
//判空处理
if (CollectionUtils.isEmpty(nodes)) {
return null;
}
//获取前一个节点
Long flag = 0L;
String preNode = null;
for (String node : nodes) {
//获取每个节点的序列化号
Long serial = Long.valueOf(StringUtil.substringAfter(path, '_'));
if (serial < curSerial && serial > flag) {
flag = serial;
preNode = node;
}
}
return preNode;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
public void unlock() {
try {
this.zooKeeper.delete(path, 0);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
主要修改了构造方法和lock方法:
并添加了getPreNode获取前置节点的方法。
测试结果如下:
性能反而更弱了。
原因:虽然不用反复争抢创建节点了,但是会自选判断自己是最小的节点,这个判断逻辑反而更复杂更 耗时。
对于这个算法有个极大的优化点:假如当前有1000个节点在等待锁,如果获得锁的客户端释放锁时,这1000个客户端都会被唤醒,这种情况称为“羊群效应”;在这种羊群效应中,zookeeper需要通知1000个 客户端,这会阻塞其他的操作,最好的情况应该只唤醒新的最小节点对应的客户端。应该怎么做呢?在 设置事件监听时,每个客户端应该对刚好在它之前的子节点设置事件监听,例如子节点列表 为/lock/lock-0000000000、/lock/lock-0000000001、/lock/lock-0000000002,序号为1的客户端监听 序号为0的子节点删除消息,序号为2的监听序号为1的子节点删除消息。
public void lock() {
String preNode = getpreNode(path);
//如果该节点没有前一个节点,说明该节点是最小的节点
if (StringUtils.isEmpty(preNode)) {
return;
} else {
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, watchedEvent -> {
countDownLatch.countDown();
}) == null) {
return;
}
countDownLatch.await();
return;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock();
}
}
压力测试效果如下:
由此可见性能提高不少仅次于redis的分布式锁。
引入ThreadLocal线程局部变量保证zk分布式锁的可重入性。
在对应的线程的存储数据。
public class ZkDistributedLock {
public static final String ROOT_PATH = "/distribute";
private String path;
private ZooKeeper zooKeeper;
private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();
public ZkDistributedLock(ZooKeeper zooKeeper, String lockname) {
this.zooKeeper = zooKeeper;
try {
this.path = zooKeeper.create(ROOT_PATH + "/" + lockname + "_",
null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void lock() {
Integer flag = THREAD_LOCAL.get();
if (flag != null && flag > 0) {
THREAD_LOCAL.set(flag + 1);
return;
}
String preNode = getpreNode(path);
//如果该节点没有前一个节点,说明该节点是最小的节点
if (StringUtils.isEmpty(preNode)) {
return;
} else {
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, watchedEvent -> {
countDownLatch.countDown();
}) == null) {
return;
}
countDownLatch.await();
THREAD_LOCAL.set(1);
return;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock();
}
}
/**
* 获取指定节点的前节点
*
* @param path
* @return
*/
private String getpreNode(String path) {
//获取当前节点的序列化序号
Long curSerial = Long.valueOf(StringUtil.substringAfter(path, '_'));
//获取根路径下的所有序列化子节点
try {
List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);
//判空处理
if (CollectionUtils.isEmpty(nodes)) {
return null;
}
//获取前一个节点
Long flag = 0L;
String preNode = null;
for (String node : nodes) {
//获取每个节点的序列化号
Long serial = Long.valueOf(StringUtil.substringAfter(path, '_'));
if (serial < curSerial && serial > flag) {
flag = serial;
preNode = node;
}
}
return preNode;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
public void unlock() {
try {
THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);
if (THREAD_LOCAL.get() == 0) {
this.zooKeeper.delete(path, 0);
THREAD_LOCAL.remove();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
我们知道了 Redis 主要是通过 setnx 命令实现分布式锁,Zookeeper 采用临时节点和事件监听机制可以实现分布式锁。那么这两种方式有哪些关键的区别呢?
这样看好像ZooKeeper比Redis更胜一筹,但Redis提供的API和库更加丰富,在很大程度上能够减少开发的工作量。而且如果是小规模的项目,已经部署了Redis,可能没太大必要去再部署一套ZooKeeper集群去实现分布式锁,大家根据场景自行选择。
参考文章:https://blog.csdn.net/polsnet/article/ details/130444403 https://blog.csdn.net/m0_62436868 /article/details/13046561
推荐阅读 点击标题可跳转