ZookeeperZNode基本命令四字命令SessionWatcherACLZookeeper集群Paxos算法ZAB协议Curator分布式锁

ZooKeeper是一个典型的分布式数据一致性解决方案,特点如下:

  • 顺序一致性:从同一客户端发起的事务请求,最终将会严格地按照顺序被应用到ZooKeeper中去;
  • 原子性:所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,也就是说,要么整个集群中所有的机器都成功应用了某一个事务,要么都没有应用;
  • 单一系统映像:无论客户端连到哪一个 ZooKeeper 服务器上,其看到的服务端数据模型都是一致的;
  • 可靠性:一旦一次更改请求被应用,更改的结果就会被持久化,直到被下一次更改覆盖;

设计目标:

  • 顺序访问:对于来自客户端的每个更新请求,ZooKeeper都会分配一个全局唯一的递增编号,这个编号反应了所有事务操作的先后顺序,应用程序可以使用 ZooKeeper 这个特性来实现更高层次的同步原语。 这个编号也叫做时间戳(zxid,即:Zookeeper Transaction Id)
  • 高性能:ZooKeeper是高性能的,在读多于写的应用程序中尤其地高性能,因为写会导致所有的服务器间同步状态(读多于写是协调服务的典型场景)

主要应用场景如下:

  • 数据发布/订阅;
  • 负载均衡;
  • 命名服务;
  • 分布式协调/通知;
  • 集群管理;
  • Master 选举;
  • 分布式锁;
  • 分布式队列;

ZNode

在Zookeeper中,ZNode可以分为持久节点和临时节点两类。所谓持久节点是指一旦这个ZNode被创建了,除非主动进行ZNode的移除操作,否则这个ZNode将一直保存在Zookeeper上。而临时节点就不一样了,它的生命周期和客户端会话绑定,一旦客户端会话失效,那么这个客户端创建的所有临时节点都会被移除。另外,ZooKeeper还允许用户为每个节点添加一个特殊的属性:SEQUENTIAL.一旦节点被标记上这个属性,那么在这个节点被创建的时候,Zookeeper会自动在其节点名后面追加上一个整型数字,这个整型数字是一个由父节点维护的自增数字。

基本命令

// 查看节点
ls /test
// stat可以查看节点的一些详细信息
stat /test
// ls2相当于是ls和stat两个命令的组合
ls2 /test

// 创建节
create /test test
// 创建临时节点,会话断开之后并且session超时之后,心跳停止,临时节点自动删除
create -e /test/temp test-temp
// 创建有序节点
create -s /test/seq seqqqq

// 删除节点
delete /test 

// 设置值,命令执行之后,dataVersion的值会变化
set /test new-test
// 可以按照版本号操作,相当于是一个乐观锁
set /test new-test 0

// 以下两个watcher是针对父节点触发
stat /test watcher
get /test watcher

// 对子节点操作触发
ls /test watcher

// 获取节点的ACL相关信息
getAcl /test

// world形式是匿名用户,默认就是这个
setAcl /test world:anyone:rda

// 类似于登录,相当于添加一个用户,并以添加的用户进行登录
addauth digest immoc:immoc
// auth密码方式是明文,digest密码方式是密文
setAcl /test auth:immoc:immoc:rdwa

addauth digest immoc:immoc
// digest密码方式是密文
setAcl /test digest:immoc:密码加密后的密文:rdwa

setAcl /test ip:192.168.11.223:rdwa

Stat属性

Stat数据结构里面维护了一些节点的详细信息,Stat中记录了这个ZNode 的三个数据版本,分别是dataVersion(当前ZNode的版本)、cversion(当前ZNode子节点的版本)、aclVersion(当前ZNode的ACL版本)。

czxid:引起这个znode创建的zxid,创建节点的事务的zxid(ZooKeeper Transaction Id);
ctime:znode被创建的毫秒数(从1970年开始);
mzxid:znode最后更新的zxid;
mtime:znode最后修改的毫秒数(从1970年开始);
pZxid:znode最后更新的子节点zxid;
cversion:znode子节点变化号,znode子节点修改次数;
dataversion:znode数据变化号;
aclVersion:znode访问控制列表的变化号;
ephemeralOwner:如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0;
dataLength:znode的数据长度;
numChildren:znode子节点数量;

四字命令

一些与服务器交互的简写命令

echo mntr | nc localhost 2181

Session

  • 客户端与服务端之间的连接存在会话;
  • 每个会话可以设置超时时间;
  • 心跳结束,则Session会过期;
  • Session过期,则临时节点会被删除;

Watcher

Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是Zookeeper实现分布式协调服务的重要特性,可用于统一资源配置。

  • 针对每个节点的操作,都会有一个watcher;
  • 当监控的某个节点发生变化,则触发watcher事件;
  • ZK中的watcher是一次性的,出发后立即销毁;
  • 父节点子节点都可以触发watcher事件;
  • 针对不同的操作类型,触发watcher的事件也不同:增、删、改。

Watcher事件

stat /test watcher
get /test watcher
- 创建父节点触发:NodeCreated;
- 修改父节点触发:NoeDataChanged;
- 删除父节点触发:NodeDeleed;


ls /test watcher
- ls为父节点设置watcher,创建子节点触发:NodeChildrenChanged;
- ls为父节点设置watcher,删除子节点触发:NodeChildrenChanged;

ACL

Zookeeper采用ACL(AccessControlLists)策略来进行权限控制,类似于 UNIX 文件系统的权限控制。使用场景:开发/测试环境分离,开发者无权操作测试库的节点,只能查看;生产环境上控制指定IP的服务可以访问相关节点,防止混乱。

ACL命令

  • getAcl:获取某个节点的ACL权限信息;
  • setAcl:设置某个节点的ACL权限信息;
  • addauth:输入认证信息,注册时输入明文注册,ZK中对密码时加密的;

ACL构成

通过[scheme:id:permissions]来构成权限列表,和我们平时系统种的权限管理很像:

  • scheme:采用的某种权限机制;
  • id:访问的用户;
  • permissions:权限列表;
scheme
  • world:匿名访问,world形式下只有一个用户:anyone;
  • auth:密码是明文;
  • digest:密码是密文;
  • ip:通过限制IP访问;
  • supper:超级管理员工,需要修改配置文件并重启ZK;
permissions

Zookeeper中定义了如下5种权限(crwda),其中尤其需要注意的是,CREATE和DELETE这两种权限都是针对子节点的权限控制:

  • CREATE:创建子节点的权限;
  • READ:获取节点数据和子节点列表的权限;
  • WRITE:更新节点数据的权限;
  • DELETE:删除子节点的权限;
  • ADMIN:创设置子节点ACL的权限;

相关操作

getAcl /test

setAcl /test world:anyone:rda

// 类似于登录吧
addauth digest immoc:immoc
setAcl /test auth:immoc:immoc:rdwa

addauth digest immoc:immoc
setAcl /test digest:immoc:密码加密后的密文:rdwa

setAcl /test ip:192.168.11.223:rdwa

super
修改zkServer.sh配置文件,然后重启ZK

Zookeeper集群

  • 配置数据文件 myid 1/2/3,对用server.1/2/3
  • 通过 ./zkClish.sh -server [ip]:[port] 检测集群是否配置 最典型集群模式: Master/Slave 模式(主备模式)。在这种模式中,通常 Master服务器作为主服务器提供写服务,其他的 Slave 服务器从服务器通过异步复制的方式获取 Master 服务器最新的数据提供读服务。但是,在 ZooKeeper 中没有选择传统的 Master/Slave 概念,而是引入了Leader、Follower 和 Observer 三种角色。

ZooKeeper集群中的所有机器通过一个Leader选举过程来选定一台称为 “Leader” 的机器,Leader 既可以为客户端提供写服务又能提供读服务。除了 Leader 外,Follower 和 Observer 都只能提供读服务。Follower 和 Observer 唯一的区别在于 Observer 机器不参与 Leader 的选举过程,也不参与写操作的“过半写成功”策略,因此 Observer 机器可以在不影响写性能的情况下提升集群的读性能。

Paxos算法

图解Paxos一致性协议

Paxos算法应该可以说是ZooKeeper的灵魂,但是ZooKeeper并没有完全采用Paxos算法,而是使用ZAB协议作为其保证数据一致性的核心算法。另外,在ZooKeeper的官方文档中也指出,ZAB协议并不像 Paxos算法那样,是一种通用的分布式一致性算法,它是一种特别为Zookeeper设计的崩溃可恢复的原子消息广播算法。

ZAB协议

ZAB协议分析

ZAB(ZooKeeper Atomic Broadcast 原子广播)协议是为分布式协调服务ZooKeeper专门设计的一种支持崩溃恢复的原子广播协议。在ZooKeeper中,主要依赖ZAB协议来实现分布式数据一致性,基于该协议,ZooKeeper实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性。

ZAB协议包括两种基本的模式,分别是崩溃恢复和消息广播。当整个服务框架在启动过程中或是当Leader服务器出现网络中断、崩溃退出、重启等异常情况时,ZAB协议就会进人恢复模式并选举产生新的Leader服务器。当选举产生了新的Leader服务器,同时集群中已经有过半的机器与该Leader服务器完成了状态同步之后,ZAB协议就会退出恢复模式进入广播模式。其中,所谓的状态同步是指数据同步,用来保证集群中存在过半的机器能够和Leader服务器的数据状态保持一致。当集群中有过半的Follower服务器完成了和Leader服务器的状态同步,那么整个服务框架就可以进人消息广播模式了。 当一台同样遵守ZAB协议的服务器启动后加人到集群中时,如果此时集群中已经存在一个Leader服务器在负责进行消息广播,那么新加入的服务器就会自觉地进人数据恢复模式,找到Leader所在的服务器,并与其进行数据同步,然后一起参与到消息广播流程中去。 ZooKeeper设计成只允许唯一的一个Leader服务器来进行事务请求的处理,Leader服务器在接收到客户端的事务请求后,会生成对应的事务提案并发起一轮广播协议;而如果集群中的其他机器接收到客户端的事务请求,那么这些非Leader服务器会首先将这个事务请求转发给Leader服务器。

广播模式

  1. leader从客户端收到一个写请求;
  2. leader生成一个新的事务并为这个事务生成一个唯一的ZXID;
  3. leader将这个事务发送给所有的follows节点;
  4. follower节点将收到的事务请求加入到历史队列(history queue)中,并发送ack给leader;
  5. 当leader收到大多数follower(超过法定数量)的ack消息,leader会发送commit请求;
  6. 当follower收到commit请求时,会判断该事务的ZXID是不是比历史队列中的任何事务的ZXID都小,如果是则提交,如果不是则等待比它更小的事务的commit;

恢复模式

恢复模式大致可以分为四个阶段:

  1. 选举;
  2. 发现;
  3. 同步;
  4. 广播;

恢复过程的步骤大致可分为:

  1. 当leader崩溃后,集群进入选举阶段,开始选举出潜在的新leader(一般为集群中拥有最大ZXID的节点);
  2. 进入发现阶段,follower与潜在的新leader进行沟通,如果发现超过法定人数的follower同意,则潜在的新leader将epoch加1,进入新的纪元,新的leader产生;
  3. 集群间进行数据同步,保证集群中各个节点的事务一致;
  4. 集群恢复到广播模式,开始接受客户端的写请求;

当 leader在commit之后但在发出commit消息之前宕机,即只有老leader自己commit了,而其它follower都没有收到commit消息 新的leader也必须保证这个proposal被提交.(新的leader会重新发送该proprosal的commit消息) 当 leader产生某个proprosal之后但在发出消息之前宕机,即只有老leader自己有这个proproal,当老的leader重启后(此时左右follower),新的leader必须保证老的leader必须丢弃这个proprosal.(新的leader会通知上线后的老leader截断其epoch对应的最后一个commit的位置)

Curator

  • 超时重连接;
  • watcher注册一次永久生效;
  • 支持递归创建节点;
  • 提供更多解决方案并且简单:分布式锁;
  • 提供常用工具类;
  • 编程风格;

自动重试

/**
 * 同步创建zk示例,原生api是异步的
 * 
 * curator链接zookeeper的策略:ExponentialBackoffRetry
 * baseSleepTimeMs:初始sleep的时间
 * maxRetries:最大重试次数
 * maxSleepMs:最大重试时间
*/
RetryPolicy retryPolicy1 = new ExponentialBackoffRetry(1000, 5);

/**
 * curator链接zookeeper的策略:RetryNTimes
 * n:重试的次数
 * sleepMsBetweenRetries:每次重试间隔的时间
 */
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

/**
 * curator链接zookeeper的策略:RetryOneTime
 * sleepMsBetweenRetry:每次重试间隔的时间
 */
RetryPolicy retryPolicy = new RetryOneTime(3000);

/**
 * 永远重试,不推荐使用
*/
RetryPolicy retryPolicy = new RetryForever(retryIntervalMs)

构建Client

private CuratorFramework client = null;
private static final String zkServerPath = "192.168.11.223:2181";
client = CuratorFrameworkFactory.builder()
        .connectString(zkServerPath)
        .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
        .namespace("workspace").build();
client.start();

节点操作

String nodePath = "/super/imooc";

// 创建节点
byte[] data = "superme".getBytes();
client.create().creatingParentsIfNeeded()
    .withMode(CreateMode.PERSISTENT)
    .withACL(Ids.OPEN_ACL_UNSAFE)
    .forPath(nodePath, data);

// 删除节点
client.delete()
        .guaranteed()                   // 如果删除失败,那么在后端还是继续会删除,直到成功
        .deletingChildrenIfNeeded() // 如果有子节点,就删除
        .withVersion(0)
        .forPath(nodePath);

// 更新节点数据
byte[] newData = "batman".getBytes();
cto.client.setData().withVersion(0).forPath(nodePath, newData);

// 读取节点数据
Stat stat = new Stat();
byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("节点" + nodePath + "的数据为: " + new String(data));
System.out.println("该节点的版本号为: " + stat.getVersion());

// 查询子节点
List<String> childNodes = cto.client.getChildren().forPath(nodePath);
System.out.println("开始打印子节点:");
for (String s : childNodes) {
    System.out.println(s);
}

// 判断节点是否存在,如果不存在则为空
Stat statExist = cto.client.checkExists().forPath(nodePath + "/abc");

Watcher

// watcher事件,当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);

// CuratorWatcher
public class MyCuratorWatcher implements CuratorWatcher {
    @Override
    public void process(WatchedEvent event) throws Exception {
        System.out.println("触发watcher,节点路径为:" + event.getPath());
    }
}

// 原生的Watcher
public class MyWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        System.out.println("触发watcher,节点路径为:" + event.getPath());
    }
}

// 为节点添加watcher,NodeCache: 监听数据节点的变更,会触发事件
final NodeCache nodeCache = new NodeCache(client, nodePath);
// buildInitial : 初始化的时候获取node的值并且缓存
nodeCache.start(true);
if (nodeCache.getCurrentData() != null) {
    System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
} else {
    System.out.println("节点初始化数据为空...");
}
nodeCache.getListenable().addListener(new NodeCacheListener() {
    public void nodeChanged() throws Exception {
        if (nodeCache.getCurrentData() == null) {
            System.out.println("空");
            return;
        }
        String data = new String(nodeCache.getCurrentData().getData());
        System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);
    }
});


// 为子节点添加watcher
// PathChildrenCache: 监听数据节点的增删改,会触发事件
String childNodePathCache =  nodePath;
// cacheData: 设置缓存节点的数据状态
final PathChildrenCache childrenCache = new PathChildrenCache(client, childNodePathCache, true);
/**
 * StartMode: 初始化方式
 * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
 * NORMAL:异步初始化
 * BUILD_INITIAL_CACHE:同步初始化
 */
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);

List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点数据列表:");
for (ChildData cd : childDataList) {
    String childData = new String(cd.getData());
    System.out.println(childData);
}

childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
            System.out.println("子节点初始化ok...");
        }
        
        else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
            String path = event.getData().getPath();
            if (path.equals(ADD_PATH)) {
                System.out.println("添加子节点:" + event.getData().getPath());
                System.out.println("子节点数据:" + new String(event.getData().getData()));
            } else if (path.equals("/super/imooc/e")) {
                System.out.println("添加不正确...");
            }
            
        }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
            System.out.println("删除子节点:" + event.getData().getPath());
        }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
            System.out.println("修改子节点路径:" + event.getData().getPath());
            System.out.println("修改子节点数据:" + new String(event.getData().getData()));
        }
    }
});

Watcher统一配置

各个Client监听节点变化,然后做相应的的逻辑操作,即:修改配置文件

public class Client1 {

    public CuratorFramework client = null;
    public static final String zkServerPath = "192.168.1.110:2181";

    public Client1() {
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }
    
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }
    
//  public final static String CONFIG_NODE = "/super/imooc/redis-config";
    public final static String CONFIG_NODE_PATH = "/super/imooc";
    public final static String SUB_PATH = "/redis-config";
    public static CountDownLatch countDown = new CountDownLatch(1);
    
    public static void main(String[] args) throws Exception {
        Client1 cto = new Client1();
        System.out.println("client1 启动成功...");
        
        final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
        childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
        
        // 添加监听事件
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                // 监听节点变化
                if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                    String configNodePath = event.getData().getPath();
                    if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
                        System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);
                        
                        // 读取节点数据
                        String jsonConfig = new String(event.getData().getData());
                        System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);
                        
                        // 从json转换配置
                        RedisConfig redisConfig = null;
                        if (StringUtils.isNotBlank(jsonConfig)) {
                            redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
                        }
                        
                        // 配置不为空则进行相应操作
                        if (redisConfig != null) {
                            String type = redisConfig.getType();
                            String url = redisConfig.getUrl();
                            String remark = redisConfig.getRemark();
                            // 判断事件
                            if (type.equals("add")) {
                                System.out.println("监听到新增的配置,准备下载...");
                                // ... 连接ftp服务器,根据url找到相应的配置
                                Thread.sleep(500);
                                System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
                                // ... 下载配置到你指定的目录
                                Thread.sleep(1000);
                                System.out.println("下载成功,已经添加到项目中");
                                // ... 拷贝文件到项目目录
                            } else if (type.equals("update")) {
                                System.out.println("监听到更新的配置,准备下载...");
                                // ... 连接ftp服务器,根据url找到相应的配置
                                Thread.sleep(500);
                                System.out.println("开始下载配置文件,下载路径为<" + url + ">");
                                // ... 下载配置到你指定的目录
                                Thread.sleep(1000);
                                System.out.println("下载成功...");
                                System.out.println("删除项目中原配置文件...");
                                Thread.sleep(100);
                                // ... 删除原文件
                                System.out.println("拷贝配置文件到项目目录...");
                                // ... 拷贝文件到项目目录
                            } else if (type.equals("delete")) {
                                System.out.println("监听到需要删除配置");
                                System.out.println("删除项目中原配置文件...");
                            }
                            
                            // TODO 视情况统一重启服务
                        }
                    }
                }
            }
        });
        
        countDown.await();
        
        cto.closeZKClient();
    }
}

ACL

// 第一种方式

RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
        .connectString(zkServerPath)
        .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
        .namespace("workspace").build();
client.start();

String nodePath = "/acl/father/child/sub";

List<ACL> acls = new ArrayList<ACL>();
Id imooc1 = new Id("digest", AclUtils.getDigestUserPwd("imooc1:123456"));
Id imooc2 = new Id("digest", AclUtils.getDigestUserPwd("imooc2:123456"));
acls.add(new ACL(Perms.ALL, imooc1));
acls.add(new ACL(Perms.READ, imooc2));
acls.add(new ACL(Perms.DELETE | Perms.CREATE, imooc2));

// 创建节点,递归创建
byte[] data = "spiderman".getBytes();
cto.client.create().creatingParentsIfNeeded()
        .withMode(CreateMode.PERSISTENT)
        .withACL(acls)
        .forPath(nodePath, data);

第二种方式

RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder().authorization("digest", "imooc1:123456".getBytes())
        .connectString(zkServerPath)
        .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
        .namespace("workspace").build();
client.start();

String nodePath = "/acl/father/child/sub";

List<ACL> acls = new ArrayList<ACL>();
Id imooc1 = new Id("digest", AclUtils.getDigestUserPwd("imooc1:123456"));
Id imooc2 = new Id("digest", AclUtils.getDigestUserPwd("imooc2:123456"));
acls.add(new ACL(Perms.ALL, imooc1));
acls.add(new ACL(Perms.READ, imooc2));
acls.add(new ACL(Perms.DELETE | Perms.CREATE, imooc2));

// 创建节点,递归创建
byte[] data = "spiderman".getBytes();
cto.client.create().creatingParentsIfNeeded()
        .withMode(CreateMode.PERSISTENT)
        .withACL(acls)
        .forPath(nodePath, data);

设置权限

client.setACL().withACL(acls).forPath("/curatorNode");

分布式锁

Zookeeper实现分布式锁

通过判断节点是否存在

第一种方式,通过指定节点进行判断,例如: /workspace/lock/lockkTest,客户端尝试创建这个节点,创建成功就处理业务,处理业务完成之后就释放锁,即:删除这个节点;创建失败就等待,直到创建成功

public class DistributedLock {
    public final static String zkServerPath = "192.168.11.223:2181";

    private final static String LOCK_PROJECT = "/lock";

    private final static String LOCK_NODE = "/lockTest";

    private static DistributedLock distributedLock = null;

    private CuratorFramework client = null;

    private CountDownLatch zkLockPath = new CountDownLatch(1);

    private DistributedLock() {
        createClient();
        initNode();
    }

    public static DistributedLock getInstance() {
        if (distributedLock == null) {
            synchronized (DistributedLock.class) {
                if (distributedLock == null) {
                    distributedLock = new DistributedLock();
                }
            }
        }
        return distributedLock;
    }


    private void createClient() {
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    private void initNode() {
        try {
            if (client.checkExists().forPath(LOCK_PROJECT) == null) {
                client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(LOCK_PROJECT);
            }

            addWatcherLock(LOCK_PROJECT);

        } catch (Exception e) {
            System.out.println("连接ZK失败");
            e.printStackTrace();
        }
    }

    /**
     * 获取锁
     */
    public void tryLock() {
        while (true) {
            try {
                client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(LOCK_PROJECT + LOCK_NODE);
                return;
            } catch (Exception e) {
                System.out.println("获取锁失败,阻塞线程");
                try {
                    if (zkLockPath.getCount() <= 0) {
                        zkLockPath = new CountDownLatch(1);
                    }
                    zkLockPath.await();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    /**
     * 监听
     *
     * @param path
     * @throws Exception
     */
    public void addWatcherLock(String path) throws Exception {
        final PathChildrenCache cache = new PathChildrenCache(client, path, true);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if (PathChildrenCacheEvent.Type.CHILD_REMOVED.equals(event.getType())) {
                    String path = event.getData().getPath();
                    if (path.contains(LOCK_NODE)) {
                        zkLockPath.countDown();
                    }
                }
            }
        });
    }

    /**
     * 释放锁
     *
     * @return
     */
    public boolean releaseLock() {
        try {
            if (client.checkExists().forPath(LOCK_PROJECT + LOCK_NODE) != null) {
                client.delete().forPath(LOCK_PROJECT + LOCK_NODE);
            }
        } catch (Exception e) {
            System.out.println("释放锁失败");
            return false;
        }
        return true;
    }
}

测试

public class Test {
    public static void main(String[] args) {
        DistributedLock distributedLock = DistributedLock.getInstance();

        new Thread(() -> {
            try {
                distributedLock.tryLock();
                System.out.println("TEST1 " + Thread.currentThread().getName() + " 获得锁开始处理业务 " + System.currentTimeMillis());
                Thread.sleep(10000);
                distributedLock.releaseLock();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

    }
}

public class Test2 {
    public static void main(String[] args) {
        DistributedLock distributedLock = DistributedLock.getInstance();

        new Thread(() -> {
            try {
                distributedLock.tryLock();
                System.out.println("TEST2 " + Thread.currentThread().getName() + " 获得锁开始处理业务 " + System.currentTimeMillis());
                Thread.sleep(10000);
                distributedLock.releaseLock();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
}

通过判断节点是否存在

  1. 客户端连接zookeeper,并在/lock下创建临时的且有序的子节点,第一个客户端对应的子节点为/lock/lock-0000000000,第二个为/lock/lock-0000000001,以此类推;
  2. 客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通知后重复此步骤直至获得锁;
  3. 执行业务代码;
  4. 完成业务流程后,删除对应的子节点释放锁;
public class DistributedLock {
    public static final String zkServerPath = "192.168.11.223:2181";

    private static DistributedLock distributedLock = null;

    private CuratorFramework client = null;

    private DistributedLock() {
        createClient();
        initNode();
    }

    public static DistributedLock getInstance() {
        if (distributedLock == null) {
            synchronized (DistributedLock.class) {
                if (distributedLock == null) {
                    distributedLock = new DistributedLock();
                }
            }
        }
        return distributedLock;
    }


    private void createClient() {
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    private void initNode() {
        try {
            if (client.checkExists().forPath("/lock") != null) {
                client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath("/lock");
            }

            addWatcherLock("/lock");

        } catch (Exception e) {
            System.out.println("连接ZK失败");
            e.printStackTrace();
        }
    }


    private CountDownLatch zkLockPath = new CountDownLatch(1);


    public void getLock() {
        while (true) {
            try {
                client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath("/lock");
            } catch (Exception e) {
                e.printStackTrace();
                try {
                    if (zkLockPath.getCount() <= 0) {
                        zkLockPath = new CountDownLatch(1);
                    }
                    zkLockPath.await();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
        }
    }

    public void addWatcherLock(String path) throws Exception {
        final PathChildrenCache cache = new PathChildrenCache(client, path, true);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if (PathChildrenCacheEvent.Type.CHILD_REMOVED.equals(event.getType())) {
                    String path = event.getData().getPath();
                    if (path.contains("lock")) {
                        zkLockPath.countDown();
                    }
                }
            }
        });
    }


    public boolean releaseLock() {
        try {
            if (client.checkExists().forPath("/lock") != null) {
                client.delete().forPath("/lock");
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}

ZXID

ZooKeeper会为每一个事务生成一个唯一且递增长度为64位的ZXID,ZXID由两部分组成:低32位表示计数器(counter)和高32位的纪元号(epoch)。epoch为当前leader在成为leader的时候生成的,且保证会比前一个leader的epoch大。

实际上当新的leader选举成功后,会拿到当前集群中最大的一个ZXID,并去除这个ZXID的epoch,并将此epoch进行加1操作,作为自己的epoch

history queue:每一个follower节点都会有一个先进先出(FIFO)的队列用来存放收到的事务请求,保证执行事务的顺序。 可靠提交由ZAB的事务一致性协议保证,全局有序由TCP协议保证,因果有序由follower的历史队列(history queue)保证。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏安恒网络空间安全讲武堂

Python编写渗透工具学习笔记二 | 0x02利用FTP与web批量抓肉鸡

0x02利用FTP与web批量抓肉鸡 脚本要实现的目标和思路: 先尝试匿名登录ftp,当匿名登录失败时再尝试用用户/密码爆破登录,登录成功后,脚本会搜索ftp中...

1.6K7
来自专栏成猿之路

关于Java Tomcat 内存溢出排查心得分享

2163
来自专栏向治洪

Android监听自身卸载,弹出用户反馈调查

1,情景分析         在上上篇博客中我写了一下NDK开发实践项目,使用开源的LAME库转码MP3,作为前面几篇基础博客的加深理解使用的,但是这样的项目用...

2995
来自专栏CSDN技术头条

一组 Redis 实际应用中的异常场景及其根因分析和解决方案

在上一场 Chat《基于 Redis 的分布式缓存实现方案及可靠性加固策略》中,我已经较为全面的介绍了 Redis 的原理和分布式缓存方案。如果只是从“会用”的...

3333
来自专栏张首富-小白的成长历程

redis缓存服务器

#你当前没有指定配置文件,以默认的配置文件启动,如果你想指定配置文件你可以redis-server 文件所在位置

4542
来自专栏容器云生态

Openstack平台搭建之第二天

Openstack平台搭建之第二天 If you have any question ,please contact me by weichuangxxb@si...

47010
来自专栏Esofar 开发日记

[译]RabbitMQ教程C#版 - 远程过程调用(RPC)

但是如果我们想要运行一个在远程计算机上的函数并等待其结果呢?这将是另外一回事了。这种模式通常被称为 远程过程调用 或 RPC 。

1032
来自专栏博客园迁移

redis见解

http://blog.csdn.net/zhiguozhu/article/details/50517527 Redis 原生session与redis中的s...

1771
来自专栏difcareer的技术笔记

ELF中可以被修改又不影响执行的区域

看雪上这篇文章讲述了两种对so进行加固的方法:1. 分离section,对整个section进行加密。2.在.text section直接寻找目标函数并进行加密...

1234
来自专栏Esofar 开发日记

[译]RabbitMQ教程C#版 - 远程过程调用(RPC)

但是如果我们想要运行一个在远程计算机上的函数并等待其结果呢?这将是另外一回事了。这种模式通常被称为 远程过程调用 或 RPC 。

1650

扫码关注云+社区

领取腾讯云代金券