前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Zookeeper之Watch机制、分布式锁的实现

Zookeeper之Watch机制、分布式锁的实现

作者头像
不蜇人的小蜜蜂
发布2019-08-29 13:06:49
1.9K0
发布2019-08-29 13:06:49
举报

Zookeeper作为分布式协调组件,在分布式的架构中承担着不可替代的作用,在与dubbo组合的架构中承担着注册中心的重任,有些SC项目也会采用Zookeeper作为配置中心,更多的时候,zookeeper作为分布式锁的的重要实现手段。现来总结一下Zookeeper的watch机制,一方面能够深入加强自己对zk的理解应用,另一方面也能更好地把控zk对我们项目的支持。

先简单总结一下ZK的watch机制:

watch机制是ZK比较重要的一个特性,zk允许我们在创建节点、改变节点数据、删除节点、子节点发生变化的时候添加事件监听(watch)用来对当前事件进行监听。当数据发生变化的时候 ,zk产生一个watcher事件推送到客户端,但这只能监听到一次,可以通过循环监听实现永久监听效果。

那什么时候才会触发watch事件呢?

凡是事务类型的操作都会触发watch事件,例如create/delete/setData等

如何注册监听呢?

可用通过以下三个方式绑定:getData()、Exists()、getChildren().

watcher 事件类型

None (-1), 客户端链接状态发生变化的时候,会 收到 none 的事件 NodeCreated (1), 创建节点的事件。 比如 zkpersis-mic

NodeDeleted (2), 删除节点的事件

NodeDataChanged (3), 节点数据发生变更

NodeChildrenChanged (4); 子节点被创建、被删除、会 发生事件触发

附录利用ZKClient和Curator两种方式进行简单API的操作

代码语言:javascript
复制
public static void main(String[] args) throws IOException {
    final CountDownLatch countDownLatch = new CountDownLatch(1);
    ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
                countDownLatch.countDown();
            }
        }
    });
    try {
        countDownLatch.await();//在真正的状态变为Connected之前一直阻塞,知道执行countDown()方法
        System.out.println(zooKeeper.getState());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    try {
       // String string = zooKeeper.create("/study","hello".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        //System.out.println(string);
        String message = new String(zooKeeper.getData("/study", null, new Stat()));
        System.out.println("message -> "+message);
    } catch (KeeperException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    try {
        Stat stat = zooKeeper.setData("/hello","world".getBytes(), Integer.parseInt(Version.getVersion()));
        System.out.println("Stat -> "+stat.toString());
    } catch (KeeperException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

利用原生的ZkClient实现节点的CRUD

利用Curator客户端实现相同功能

创建连接的两种方式:采用静态工厂、builder模式

代码语言:javascript
复制
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                                 .connectString("127.0.0.1:2181")
                                 .sessionTimeoutMs(100000)
                                 .connectionTimeoutMs(5000)
                                 .retryPolicy(
                                     new ExponentialBackoffRetry(1000,3))
                                 .namespace("test")
                                 .build();
//两种创建方式
CuratorFramework client = CuratorFrameworkFactory
        .newClient("127.0.0.1:2181",5000,5000,
                new ExponentialBackoffRetry(1000,3));

创建一个持久节点/cui

代码语言:javascript
复制
curatorFramework.start();//开启客户端功能
System.err.println(curatorFramework.getState());
String path = curatorFramework.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/cui","hello world".getBytes());
System.out.println("path -> "+path+", and value -> "
            +curatorFramework.getData().forPath("/cui"));

创建节点模式有以下四种:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带序列号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:临时并且带序列号

一般的我们会依据这四种节点特性去实现不同的功能,例如接下来我们会采用ZK的临时有序节点去完成分布式锁的实现。

实现基本的CRUD都有相应的API操作,且操作简单,都是采用流式API的风格。Curator为我们提供了事务支持,即多个对节点的操作被看作是一个原子操作,例如下面的例子:

代码语言:javascript
复制
curatorFramework.inTransaction().check().forPath("/transaction")
        .and().
             setData().forPath("/transaction","transaction".getBytes())
        .and().
             create().forPath("/transaction1","transaction1".getBytes())
        .and().
             commit();

同时Curator也为我们提供了异步操作,主要依赖于BackgroundCallback接口

如下所示:

代码语言:javascript
复制
Executor executor = Executors.newFixedThreadPool(2);
curatorFramework.create()
        .creatingParentsIfNeeded()
        .withMode(CreateMode.PERSISTENT)
             .inBackground((curator, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
        },executor)
        .forPath("/path");

System.in.read();//阻塞住,方便查看异步操作后的效果

缓存

Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。

PathCache:监控路径下的子节点

NodeCache:监控特定的节点

TreeCache:监控所有的节点

代码语言:javascript
复制
curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() {
    @Override
    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
        if (connectionState.isConnected()){
            try {
                String path = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/listener");
                System.err.println("path -> "+path);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
},Executors.newFixedThreadPool(1));
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/cui");

分布式锁的实现:(基于ZKCliet实现)

代码语言:javascript
复制
public class DistributeLock implements Lock,Watcher {

    private ZooKeeper zk=null;
    private String ROOT_LOCK="/locks"; //定义根节点
    private String WAIT_LOCK; //等待前一个锁
    private String CURRENT_LOCK; //表示当前的锁

    private CountDownLatch countDownLatch; //


    public DistributeLock() {

        try {
            zk=new ZooKeeper("192.168.11.153:2181",
                    4000,this);
            //判断根节点是否存在
            Stat stat=zk.exists(ROOT_LOCK,false);
            if(stat==null){
                try {
                    zk.create(ROOT_LOCK,"0".getBytes(),
                            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }

    @Override
    public boolean tryLock() {

        try {
            //创建临时有序节点
            CURRENT_LOCK=zk.create(ROOT_LOCK+"/","0".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(Thread.currentThread().getName()+"->"+
                    CURRENT_LOCK+",尝试竞争锁");
            List<String> childrens=zk.getChildren(ROOT_LOCK,false); //获取根节点下的所有子节点
            SortedSet<String> sortedSet=new TreeSet();//定义一个集合进行排序
            for(String children:childrens){
                sortedSet.add(ROOT_LOCK+"/"+children);
            }
            String firstNode=sortedSet.first(); //获得当前所有子节点中最小的节点
            SortedSet<String> lessThenMe=((TreeSet<String>) sortedSet).headSet(CURRENT_LOCK); //
            if(CURRENT_LOCK.equals(firstNode)){//通过当前的节点和子节点中最小的节点进行比较,如果相等,表示获得锁成功
                return true;
            }
            if(!lessThenMe.isEmpty()){
                WAIT_LOCK=lessThenMe.last();//获得比当前节点更小的最后一个节点,设置给WAIT_LOCK
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }


    @Override
    public void lock() {
        if(this.tryLock()){ //如果获得锁成功
            System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+"->获得锁成功");
            return;
        }
        try {
            waitForLock(WAIT_LOCK); //没有获得锁,继续等待获得锁
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private boolean waitForLock(String prev) throws KeeperException, InterruptedException {
        Stat stat=zk.exists(prev,true);//监听当前节点的上一个节点
        if(stat!=null){
            System.out.println(Thread.currentThread().getName()+"->等待锁"+ROOT_LOCK+"/"+prev+"释放");
            countDownLatch=new CountDownLatch(1);
            countDownLatch.await();
            //TODO  watcher触发以后,还需要再次判断当前等待的节点是不是最小的
            System.out.println(Thread.currentThread().getName()+"->获得锁成功");
        }
        return true;
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        System.out.println(Thread.currentThread().getName()+"->释放锁"+CURRENT_LOCK);
        try {
            zk.delete(CURRENT_LOCK,-1);
            CURRENT_LOCK=null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    @Override
    public void process(WatchedEvent event) {
        if(this.countDownLatch!=null){
            this.countDownLatch.countDown();
        }
    }
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-09-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 BeeFarm 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 缓存
相关产品与服务
微服务引擎 TSE
微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档