Zookeeper作为分布式协调组件,在分布式的架构中承担着不可替代的作用,在与dubbo组合的架构中承担着注册中心的重任,有些SC项目也会采用Zookeeper作为配置中心,更多的时候,zookeeper作为分布式锁的的重要实现手段。现来总结一下Zookeeper的watch机制,一方面能够深入加强自己对zk的理解应用,另一方面也能更好地把控zk对我们项目的支持。
先简单总结一下ZK的watch机制:
watch机制是ZK比较重要的一个特性,zk允许我们在创建节点、改变节点数据、删除节点、子节点发生变化的时候添加事件监听(watch)用来对当前事件进行监听。当数据发生变化的时候 ,zk产生一个watcher事件推送到客户端,但这只能监听到一次,可以通过循环监听实现永久监听效果。
那什么时候才会触发watch事件呢?
凡是事务类型的操作都会触发watch事件,例如create/delete/setData等
如何注册监听呢?
可用通过以下三个方式绑定:getData()、Exists()、getChildren().
watcher 事件类型
None (-1), 客户端链接状态发生变化的时候,会 收到 none 的事件 NodeCreated (1), 创建节点的事件。 比如 zkpersis-mic
NodeDeleted (2), 删除节点的事件
NodeDataChanged (3), 节点数据发生变更
NodeChildrenChanged (4); 子节点被创建、被删除、会 发生事件触发
附录利用ZKClient和Curator两种方式进行简单API的操作
public static void main(String[] args) throws IOException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
countDownLatch.countDown();
}
}
});
try {
countDownLatch.await();//在真正的状态变为Connected之前一直阻塞,知道执行countDown()方法
System.out.println(zooKeeper.getState());
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
// String string = zooKeeper.create("/study","hello".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
//System.out.println(string);
String message = new String(zooKeeper.getData("/study", null, new Stat()));
System.out.println("message -> "+message);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Stat stat = zooKeeper.setData("/hello","world".getBytes(), Integer.parseInt(Version.getVersion()));
System.out.println("Stat -> "+stat.toString());
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
利用原生的ZkClient实现节点的CRUD
利用Curator客户端实现相同功能
创建连接的两种方式:采用静态工厂、builder模式
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(100000)
.connectionTimeoutMs(5000)
.retryPolicy(
new ExponentialBackoffRetry(1000,3))
.namespace("test")
.build();
//两种创建方式
CuratorFramework client = CuratorFrameworkFactory
.newClient("127.0.0.1:2181",5000,5000,
new ExponentialBackoffRetry(1000,3));
创建一个持久节点/cui
curatorFramework.start();//开启客户端功能
System.err.println(curatorFramework.getState());
String path = curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/cui","hello world".getBytes());
System.out.println("path -> "+path+", and value -> "
+curatorFramework.getData().forPath("/cui"));
创建节点模式有以下四种:
一般的我们会依据这四种节点特性去实现不同的功能,例如接下来我们会采用ZK的临时有序节点去完成分布式锁的实现。
实现基本的CRUD都有相应的API操作,且操作简单,都是采用流式API的风格。Curator为我们提供了事务支持,即多个对节点的操作被看作是一个原子操作,例如下面的例子:
curatorFramework.inTransaction().check().forPath("/transaction")
.and().
setData().forPath("/transaction","transaction".getBytes())
.and().
create().forPath("/transaction1","transaction1".getBytes())
.and().
commit();
同时Curator也为我们提供了异步操作,主要依赖于BackgroundCallback接口
如下所示:
Executor executor = Executors.newFixedThreadPool(2);
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.inBackground((curator, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
},executor)
.forPath("/path");
System.in.read();//阻塞住,方便查看异步操作后的效果
Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。
PathCache:监控路径下的子节点
NodeCache:监控特定的节点
TreeCache:监控所有的节点
curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
if (connectionState.isConnected()){
try {
String path = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/listener");
System.err.println("path -> "+path);
} catch (Exception e) {
e.printStackTrace();
}
}
}
},Executors.newFixedThreadPool(1));
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/cui");
分布式锁的实现:(基于ZKCliet实现)
public class DistributeLock implements Lock,Watcher {
private ZooKeeper zk=null;
private String ROOT_LOCK="/locks"; //定义根节点
private String WAIT_LOCK; //等待前一个锁
private String CURRENT_LOCK; //表示当前的锁
private CountDownLatch countDownLatch; //
public DistributeLock() {
try {
zk=new ZooKeeper("192.168.11.153:2181",
4000,this);
//判断根节点是否存在
Stat stat=zk.exists(ROOT_LOCK,false);
if(stat==null){
try {
zk.create(ROOT_LOCK,"0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public boolean tryLock() {
try {
//创建临时有序节点
CURRENT_LOCK=zk.create(ROOT_LOCK+"/","0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName()+"->"+
CURRENT_LOCK+",尝试竞争锁");
List<String> childrens=zk.getChildren(ROOT_LOCK,false); //获取根节点下的所有子节点
SortedSet<String> sortedSet=new TreeSet();//定义一个集合进行排序
for(String children:childrens){
sortedSet.add(ROOT_LOCK+"/"+children);
}
String firstNode=sortedSet.first(); //获得当前所有子节点中最小的节点
SortedSet<String> lessThenMe=((TreeSet<String>) sortedSet).headSet(CURRENT_LOCK); //
if(CURRENT_LOCK.equals(firstNode)){//通过当前的节点和子节点中最小的节点进行比较,如果相等,表示获得锁成功
return true;
}
if(!lessThenMe.isEmpty()){
WAIT_LOCK=lessThenMe.last();//获得比当前节点更小的最后一个节点,设置给WAIT_LOCK
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public void lock() {
if(this.tryLock()){ //如果获得锁成功
System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+"->获得锁成功");
return;
}
try {
waitForLock(WAIT_LOCK); //没有获得锁,继续等待获得锁
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private boolean waitForLock(String prev) throws KeeperException, InterruptedException {
Stat stat=zk.exists(prev,true);//监听当前节点的上一个节点
if(stat!=null){
System.out.println(Thread.currentThread().getName()+"->等待锁"+ROOT_LOCK+"/"+prev+"释放");
countDownLatch=new CountDownLatch(1);
countDownLatch.await();
//TODO watcher触发以后,还需要再次判断当前等待的节点是不是最小的
System.out.println(Thread.currentThread().getName()+"->获得锁成功");
}
return true;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName()+"->释放锁"+CURRENT_LOCK);
try {
zk.delete(CURRENT_LOCK,-1);
CURRENT_LOCK=null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void process(WatchedEvent event) {
if(this.countDownLatch!=null){
this.countDownLatch.countDown();
}
}
}