ZooKeeper是一个典型的分布式数据一致性解决方案,特点如下:
设计目标:
主要应用场景如下:
在Zookeeper中,ZNode可以分为持久节点和临时节点两类。所谓持久节点是指一旦这个ZNode被创建了,除非主动进行ZNode的移除操作,否则这个ZNode将一直保存在Zookeeper上。而临时节点就不一样了,它的生命周期和客户端会话绑定,一旦客户端会话失效,那么这个客户端创建的所有临时节点都会被移除。另外,ZooKeeper还允许用户为每个节点添加一个特殊的属性:SEQUENTIAL.一旦节点被标记上这个属性,那么在这个节点被创建的时候,Zookeeper会自动在其节点名后面追加上一个整型数字,这个整型数字是一个由父节点维护的自增数字。
// 查看节点
ls /test
// stat可以查看节点的一些详细信息
stat /test
// ls2相当于是ls和stat两个命令的组合
ls2 /test
// 创建节
create /test test
// 创建临时节点,会话断开之后并且session超时之后,心跳停止,临时节点自动删除
create -e /test/temp test-temp
// 创建有序节点
create -s /test/seq seqqqq
// 删除节点
delete /test
// 设置值,命令执行之后,dataVersion的值会变化
set /test new-test
// 可以按照版本号操作,相当于是一个乐观锁
set /test new-test 0
// 以下两个watcher是针对父节点触发
stat /test watcher
get /test watcher
// 对子节点操作触发
ls /test watcher
// 获取节点的ACL相关信息
getAcl /test
// world形式是匿名用户,默认就是这个
setAcl /test world:anyone:rda
// 类似于登录,相当于添加一个用户,并以添加的用户进行登录
addauth digest immoc:immoc
// auth密码方式是明文,digest密码方式是密文
setAcl /test auth:immoc:immoc:rdwa
addauth digest immoc:immoc
// digest密码方式是密文
setAcl /test digest:immoc:密码加密后的密文:rdwa
setAcl /test ip:192.168.11.223:rdwa
Stat数据结构里面维护了一些节点的详细信息,Stat中记录了这个ZNode 的三个数据版本,分别是dataVersion(当前ZNode的版本)、cversion(当前ZNode子节点的版本)、aclVersion(当前ZNode的ACL版本)。
czxid:引起这个znode创建的zxid,创建节点的事务的zxid(ZooKeeper Transaction Id);
ctime:znode被创建的毫秒数(从1970年开始);
mzxid:znode最后更新的zxid;
mtime:znode最后修改的毫秒数(从1970年开始);
pZxid:znode最后更新的子节点zxid;
cversion:znode子节点变化号,znode子节点修改次数;
dataversion:znode数据变化号;
aclVersion:znode访问控制列表的变化号;
ephemeralOwner:如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0;
dataLength:znode的数据长度;
numChildren:znode子节点数量;
一些与服务器交互的简写命令
echo mntr | nc localhost 2181
Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是Zookeeper实现分布式协调服务的重要特性,可用于统一资源配置。
Watcher事件
stat /test watcher
get /test watcher
- 创建父节点触发:NodeCreated;
- 修改父节点触发:NoeDataChanged;
- 删除父节点触发:NodeDeleed;
ls /test watcher
- ls为父节点设置watcher,创建子节点触发:NodeChildrenChanged;
- ls为父节点设置watcher,删除子节点触发:NodeChildrenChanged;
Zookeeper采用ACL(AccessControlLists)策略来进行权限控制,类似于 UNIX 文件系统的权限控制。使用场景:开发/测试环境分离,开发者无权操作测试库的节点,只能查看;生产环境上控制指定IP的服务可以访问相关节点,防止混乱。
通过[scheme:id:permissions]来构成权限列表,和我们平时系统种的权限管理很像:
Zookeeper中定义了如下5种权限(crwda),其中尤其需要注意的是,CREATE和DELETE这两种权限都是针对子节点的权限控制:
getAcl /test
setAcl /test world:anyone:rda
// 类似于登录吧
addauth digest immoc:immoc
setAcl /test auth:immoc:immoc:rdwa
addauth digest immoc:immoc
setAcl /test digest:immoc:密码加密后的密文:rdwa
setAcl /test ip:192.168.11.223:rdwa
super
修改zkServer.sh配置文件,然后重启ZK
ZooKeeper集群中的所有机器通过一个Leader选举过程来选定一台称为 “Leader” 的机器,Leader 既可以为客户端提供写服务又能提供读服务。除了 Leader 外,Follower 和 Observer 都只能提供读服务。Follower 和 Observer 唯一的区别在于 Observer 机器不参与 Leader 的选举过程,也不参与写操作的“过半写成功”策略,因此 Observer 机器可以在不影响写性能的情况下提升集群的读性能。
Paxos算法应该可以说是ZooKeeper的灵魂,但是ZooKeeper并没有完全采用Paxos算法,而是使用ZAB协议作为其保证数据一致性的核心算法。另外,在ZooKeeper的官方文档中也指出,ZAB协议并不像 Paxos算法那样,是一种通用的分布式一致性算法,它是一种特别为Zookeeper设计的崩溃可恢复的原子消息广播算法。
ZAB(ZooKeeper Atomic Broadcast 原子广播)协议是为分布式协调服务ZooKeeper专门设计的一种支持崩溃恢复的原子广播协议。在ZooKeeper中,主要依赖ZAB协议来实现分布式数据一致性,基于该协议,ZooKeeper实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性。
ZAB协议包括两种基本的模式,分别是崩溃恢复和消息广播。当整个服务框架在启动过程中或是当Leader服务器出现网络中断、崩溃退出、重启等异常情况时,ZAB协议就会进人恢复模式并选举产生新的Leader服务器。当选举产生了新的Leader服务器,同时集群中已经有过半的机器与该Leader服务器完成了状态同步之后,ZAB协议就会退出恢复模式进入广播模式。其中,所谓的状态同步是指数据同步,用来保证集群中存在过半的机器能够和Leader服务器的数据状态保持一致。当集群中有过半的Follower服务器完成了和Leader服务器的状态同步,那么整个服务框架就可以进人消息广播模式了。 当一台同样遵守ZAB协议的服务器启动后加人到集群中时,如果此时集群中已经存在一个Leader服务器在负责进行消息广播,那么新加入的服务器就会自觉地进人数据恢复模式,找到Leader所在的服务器,并与其进行数据同步,然后一起参与到消息广播流程中去。 ZooKeeper设计成只允许唯一的一个Leader服务器来进行事务请求的处理,Leader服务器在接收到客户端的事务请求后,会生成对应的事务提案并发起一轮广播协议;而如果集群中的其他机器接收到客户端的事务请求,那么这些非Leader服务器会首先将这个事务请求转发给Leader服务器。
恢复模式大致可以分为四个阶段:
恢复过程的步骤大致可分为:
当 leader在commit之后但在发出commit消息之前宕机,即只有老leader自己commit了,而其它follower都没有收到commit消息 新的leader也必须保证这个proposal被提交.(新的leader会重新发送该proprosal的commit消息) 当 leader产生某个proprosal之后但在发出消息之前宕机,即只有老leader自己有这个proproal,当老的leader重启后(此时左右follower),新的leader必须保证老的leader必须丢弃这个proprosal.(新的leader会通知上线后的老leader截断其epoch对应的最后一个commit的位置)
/**
* 同步创建zk示例,原生api是异步的
*
* curator链接zookeeper的策略:ExponentialBackoffRetry
* baseSleepTimeMs:初始sleep的时间
* maxRetries:最大重试次数
* maxSleepMs:最大重试时间
*/
RetryPolicy retryPolicy1 = new ExponentialBackoffRetry(1000, 5);
/**
* curator链接zookeeper的策略:RetryNTimes
* n:重试的次数
* sleepMsBetweenRetries:每次重试间隔的时间
*/
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
/**
* curator链接zookeeper的策略:RetryOneTime
* sleepMsBetweenRetry:每次重试间隔的时间
*/
RetryPolicy retryPolicy = new RetryOneTime(3000);
/**
* 永远重试,不推荐使用
*/
RetryPolicy retryPolicy = new RetryForever(retryIntervalMs)
private CuratorFramework client = null;
private static final String zkServerPath = "192.168.11.223:2181";
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
String nodePath = "/super/imooc";
// 创建节点
byte[] data = "superme".getBytes();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath, data);
// 删除节点
client.delete()
.guaranteed() // 如果删除失败,那么在后端还是继续会删除,直到成功
.deletingChildrenIfNeeded() // 如果有子节点,就删除
.withVersion(0)
.forPath(nodePath);
// 更新节点数据
byte[] newData = "batman".getBytes();
cto.client.setData().withVersion(0).forPath(nodePath, newData);
// 读取节点数据
Stat stat = new Stat();
byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("节点" + nodePath + "的数据为: " + new String(data));
System.out.println("该节点的版本号为: " + stat.getVersion());
// 查询子节点
List<String> childNodes = cto.client.getChildren().forPath(nodePath);
System.out.println("开始打印子节点:");
for (String s : childNodes) {
System.out.println(s);
}
// 判断节点是否存在,如果不存在则为空
Stat statExist = cto.client.checkExists().forPath(nodePath + "/abc");
// watcher事件,当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);
// CuratorWatcher
public class MyCuratorWatcher implements CuratorWatcher {
@Override
public void process(WatchedEvent event) throws Exception {
System.out.println("触发watcher,节点路径为:" + event.getPath());
}
}
// 原生的Watcher
public class MyWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
System.out.println("触发watcher,节点路径为:" + event.getPath());
}
}
// 为节点添加watcher,NodeCache: 监听数据节点的变更,会触发事件
final NodeCache nodeCache = new NodeCache(client, nodePath);
// buildInitial : 初始化的时候获取node的值并且缓存
nodeCache.start(true);
if (nodeCache.getCurrentData() != null) {
System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
} else {
System.out.println("节点初始化数据为空...");
}
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
if (nodeCache.getCurrentData() == null) {
System.out.println("空");
return;
}
String data = new String(nodeCache.getCurrentData().getData());
System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);
}
});
// 为子节点添加watcher
// PathChildrenCache: 监听数据节点的增删改,会触发事件
String childNodePathCache = nodePath;
// cacheData: 设置缓存节点的数据状态
final PathChildrenCache childrenCache = new PathChildrenCache(client, childNodePathCache, true);
/**
* StartMode: 初始化方式
* POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
* NORMAL:异步初始化
* BUILD_INITIAL_CACHE:同步初始化
*/
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点数据列表:");
for (ChildData cd : childDataList) {
String childData = new String(cd.getData());
System.out.println(childData);
}
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
System.out.println("子节点初始化ok...");
}
else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
String path = event.getData().getPath();
if (path.equals(ADD_PATH)) {
System.out.println("添加子节点:" + event.getData().getPath());
System.out.println("子节点数据:" + new String(event.getData().getData()));
} else if (path.equals("/super/imooc/e")) {
System.out.println("添加不正确...");
}
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
System.out.println("删除子节点:" + event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("修改子节点路径:" + event.getData().getPath());
System.out.println("修改子节点数据:" + new String(event.getData().getData()));
}
}
});
各个Client监听节点变化,然后做相应的的逻辑操作,即:修改配置文件
public class Client1 {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.1.110:2181";
public Client1() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
// public final static String CONFIG_NODE = "/super/imooc/redis-config";
public final static String CONFIG_NODE_PATH = "/super/imooc";
public final static String SUB_PATH = "/redis-config";
public static CountDownLatch countDown = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
Client1 cto = new Client1();
System.out.println("client1 启动成功...");
final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
// 添加监听事件
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
// 监听节点变化
if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
String configNodePath = event.getData().getPath();
if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);
// 读取节点数据
String jsonConfig = new String(event.getData().getData());
System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);
// 从json转换配置
RedisConfig redisConfig = null;
if (StringUtils.isNotBlank(jsonConfig)) {
redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
}
// 配置不为空则进行相应操作
if (redisConfig != null) {
String type = redisConfig.getType();
String url = redisConfig.getUrl();
String remark = redisConfig.getRemark();
// 判断事件
if (type.equals("add")) {
System.out.println("监听到新增的配置,准备下载...");
// ... 连接ftp服务器,根据url找到相应的配置
Thread.sleep(500);
System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
// ... 下载配置到你指定的目录
Thread.sleep(1000);
System.out.println("下载成功,已经添加到项目中");
// ... 拷贝文件到项目目录
} else if (type.equals("update")) {
System.out.println("监听到更新的配置,准备下载...");
// ... 连接ftp服务器,根据url找到相应的配置
Thread.sleep(500);
System.out.println("开始下载配置文件,下载路径为<" + url + ">");
// ... 下载配置到你指定的目录
Thread.sleep(1000);
System.out.println("下载成功...");
System.out.println("删除项目中原配置文件...");
Thread.sleep(100);
// ... 删除原文件
System.out.println("拷贝配置文件到项目目录...");
// ... 拷贝文件到项目目录
} else if (type.equals("delete")) {
System.out.println("监听到需要删除配置");
System.out.println("删除项目中原配置文件...");
}
// TODO 视情况统一重启服务
}
}
}
}
});
countDown.await();
cto.closeZKClient();
}
}
// 第一种方式
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
String nodePath = "/acl/father/child/sub";
List<ACL> acls = new ArrayList<ACL>();
Id imooc1 = new Id("digest", AclUtils.getDigestUserPwd("imooc1:123456"));
Id imooc2 = new Id("digest", AclUtils.getDigestUserPwd("imooc2:123456"));
acls.add(new ACL(Perms.ALL, imooc1));
acls.add(new ACL(Perms.READ, imooc2));
acls.add(new ACL(Perms.DELETE | Perms.CREATE, imooc2));
// 创建节点,递归创建
byte[] data = "spiderman".getBytes();
cto.client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(acls)
.forPath(nodePath, data);
第二种方式
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder().authorization("digest", "imooc1:123456".getBytes())
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
String nodePath = "/acl/father/child/sub";
List<ACL> acls = new ArrayList<ACL>();
Id imooc1 = new Id("digest", AclUtils.getDigestUserPwd("imooc1:123456"));
Id imooc2 = new Id("digest", AclUtils.getDigestUserPwd("imooc2:123456"));
acls.add(new ACL(Perms.ALL, imooc1));
acls.add(new ACL(Perms.READ, imooc2));
acls.add(new ACL(Perms.DELETE | Perms.CREATE, imooc2));
// 创建节点,递归创建
byte[] data = "spiderman".getBytes();
cto.client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(acls)
.forPath(nodePath, data);
设置权限
client.setACL().withACL(acls).forPath("/curatorNode");
第一种方式,通过指定节点进行判断,例如: /workspace/lock/lockkTest,客户端尝试创建这个节点,创建成功就处理业务,处理业务完成之后就释放锁,即:删除这个节点;创建失败就等待,直到创建成功
public class DistributedLock {
public final static String zkServerPath = "192.168.11.223:2181";
private final static String LOCK_PROJECT = "/lock";
private final static String LOCK_NODE = "/lockTest";
private static DistributedLock distributedLock = null;
private CuratorFramework client = null;
private CountDownLatch zkLockPath = new CountDownLatch(1);
private DistributedLock() {
createClient();
initNode();
}
public static DistributedLock getInstance() {
if (distributedLock == null) {
synchronized (DistributedLock.class) {
if (distributedLock == null) {
distributedLock = new DistributedLock();
}
}
}
return distributedLock;
}
private void createClient() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}
private void initNode() {
try {
if (client.checkExists().forPath(LOCK_PROJECT) == null) {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(LOCK_PROJECT);
}
addWatcherLock(LOCK_PROJECT);
} catch (Exception e) {
System.out.println("连接ZK失败");
e.printStackTrace();
}
}
/**
* 获取锁
*/
public void tryLock() {
while (true) {
try {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(LOCK_PROJECT + LOCK_NODE);
return;
} catch (Exception e) {
System.out.println("获取锁失败,阻塞线程");
try {
if (zkLockPath.getCount() <= 0) {
zkLockPath = new CountDownLatch(1);
}
zkLockPath.await();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
/**
* 监听
*
* @param path
* @throws Exception
*/
public void addWatcherLock(String path) throws Exception {
final PathChildrenCache cache = new PathChildrenCache(client, path, true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if (PathChildrenCacheEvent.Type.CHILD_REMOVED.equals(event.getType())) {
String path = event.getData().getPath();
if (path.contains(LOCK_NODE)) {
zkLockPath.countDown();
}
}
}
});
}
/**
* 释放锁
*
* @return
*/
public boolean releaseLock() {
try {
if (client.checkExists().forPath(LOCK_PROJECT + LOCK_NODE) != null) {
client.delete().forPath(LOCK_PROJECT + LOCK_NODE);
}
} catch (Exception e) {
System.out.println("释放锁失败");
return false;
}
return true;
}
}
测试
public class Test {
public static void main(String[] args) {
DistributedLock distributedLock = DistributedLock.getInstance();
new Thread(() -> {
try {
distributedLock.tryLock();
System.out.println("TEST1 " + Thread.currentThread().getName() + " 获得锁开始处理业务 " + System.currentTimeMillis());
Thread.sleep(10000);
distributedLock.releaseLock();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
public class Test2 {
public static void main(String[] args) {
DistributedLock distributedLock = DistributedLock.getInstance();
new Thread(() -> {
try {
distributedLock.tryLock();
System.out.println("TEST2 " + Thread.currentThread().getName() + " 获得锁开始处理业务 " + System.currentTimeMillis());
Thread.sleep(10000);
distributedLock.releaseLock();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
public class DistributedLock {
public static final String zkServerPath = "192.168.11.223:2181";
private static DistributedLock distributedLock = null;
private CuratorFramework client = null;
private DistributedLock() {
createClient();
initNode();
}
public static DistributedLock getInstance() {
if (distributedLock == null) {
synchronized (DistributedLock.class) {
if (distributedLock == null) {
distributedLock = new DistributedLock();
}
}
}
return distributedLock;
}
private void createClient() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}
private void initNode() {
try {
if (client.checkExists().forPath("/lock") != null) {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/lock");
}
addWatcherLock("/lock");
} catch (Exception e) {
System.out.println("连接ZK失败");
e.printStackTrace();
}
}
private CountDownLatch zkLockPath = new CountDownLatch(1);
public void getLock() {
while (true) {
try {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/lock");
} catch (Exception e) {
e.printStackTrace();
try {
if (zkLockPath.getCount() <= 0) {
zkLockPath = new CountDownLatch(1);
}
zkLockPath.await();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
public void addWatcherLock(String path) throws Exception {
final PathChildrenCache cache = new PathChildrenCache(client, path, true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if (PathChildrenCacheEvent.Type.CHILD_REMOVED.equals(event.getType())) {
String path = event.getData().getPath();
if (path.contains("lock")) {
zkLockPath.countDown();
}
}
}
});
}
public boolean releaseLock() {
try {
if (client.checkExists().forPath("/lock") != null) {
client.delete().forPath("/lock");
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
}
ZooKeeper会为每一个事务生成一个唯一且递增长度为64位的ZXID,ZXID由两部分组成:低32位表示计数器(counter)和高32位的纪元号(epoch)。epoch为当前leader在成为leader的时候生成的,且保证会比前一个leader的epoch大。
实际上当新的leader选举成功后,会拿到当前集群中最大的一个ZXID,并去除这个ZXID的epoch,并将此epoch进行加1操作,作为自己的epoch
history queue:每一个follower节点都会有一个先进先出(FIFO)的队列用来存放收到的事务请求,保证执行事务的顺序。 可靠提交由ZAB的事务一致性协议保证,全局有序由TCP协议保证,因果有序由follower的历史队列(history queue)保证。