前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >zookeeper分布式协调机制及创建分布式锁

zookeeper分布式协调机制及创建分布式锁

作者头像
神秘的寇先森
发布2018-09-29 15:10:29
5190
发布2018-09-29 15:10:29
举报
文章被收录于专栏:Java进阶之路Java进阶之路

zookeeper基本概念

要了解zookeeper如何创建分布式锁,先了解一下zookeeper。zookeeper官网给出解释:Apache ZooKeeper致力于开发和维护开源服务器,实现高度可靠的分布式协调。

Zookeeper,一种分布式应用的协作服务,是Google的Chubby一个开源的实现,是Hadoop的分布式协调服务,它包含一个简单的原语集,应用于分布式应用的协作服务,使得分布式应用可以基于这些接口实现诸如同步、配置维护和分集群或者命名的服务。ZooKeeper:提供通用的分布式锁服务,用以协调分布式应用.

zookeeper工作原理

zookeeper的核心是原子广播,这个机制保证了各个server之间的同步,实现这个机制的协议叫做Zab协议.Zab协议有两种模式,他们分别是恢复模式和广播模式.   1.当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导着被选举出来,且大多数server都完成了和leader的状态同步后,恢复模式就结束了.状态同步保证了leader和server具有相同的系统状态.   2.一旦leader已经和多数的follower进行了状态同步后,他就可以开始广播消息了,即进入广播状态.这时候当一个server加入zookeeper服务中,它会在恢复模式下启动,发下leader,并和leader进行状态同步,待到同步结束,它也参与广播消息.

zookeeper的数据模型

层次化的目录结构,命名符合常规文件系统规范 每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识 节点Znode可以包含数据和子节点,但是EPHEMERAL类型的节点不能有子节点 Znode中的数据可以有多个版本,比如某一个路径下存有多个数据版本,那么查询这个路径下的数据就需要带上版本 客户端应用可以在节点上设置监视器,节点不支持部分读写,而是一次性完整读写 Zoopkeeper 提供了一套很好的分布式集群管理的机制,就是它这种基于层次型的目录树的数据结构,并对树中的节点进行有效管理,从而可以设计出多种多样的分布式的数据管理模型。

Zookeeper的节点

Znode有两种类型,短暂的(ephemeral)和持久的(persistent) Znode的类型在创建时确定并且之后不能再修改 短暂znode的客户端会话结束时,zookeeper会将该短暂znode删除,短暂znode不可以有子节点 持久znode不依赖于客户端会话,只有当客户端明确要删除该持久znode时才会被删除

Znode有四种形式的目录节点,PERSISTENT、PERSISTENT_SEQUENTIAL、EPHEMERAL、EPHEMERAL_SEQUENTIAL.

znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性, 通过这个特性可以实现的功能包括配置的集中管理,集群管理,分布式锁等等.

Zookeeper的角色

领导者(leader),负责进行投票的发起和决议,更新系统状态 学习者(learner),包括跟随者(follower)和观察者(observer). follower用于接受客户端请求并想客户端返回结果,在选主过程中参与投票 Observer可以接受客户端连接,将写请求转发给leader,但observer不参加投票过程,只同步leader的状态,observer的目的是为了扩展系统,提高读取速度 客户端(client),请求发起方.

Watcher

Watcher 在 ZooKeeper 是一个核心功能,Watcher 可以监控目录节点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知所有设置在这个目录节点上的 Watcher,从而每个客户端都很快知道它所关注的目录节点的状态发生变化,而做出相应的反应

可以设置观察的操作:exists,getChildren,getData 可以触发观察的操作:create,delete,setData

znode以某种方式发生变化时,“观察”(watch)机制可以让客户端得到通知. 可以针对ZooKeeper服务的“操作”来设置观察,该服务的其他 操作可以触发观察. 比如,客户端可以对某个客户端调用exists操作,同时在它上面设置一个观察,如果此时这个znode不存在,则exists返回 false,如果一段时间之后,这个znode被其他客户端创建,则这个观察会被触发,之前的那个客户端就会得到通知. zookeeper的客户端封装的比较好的现在要属Apache Curator,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。详情见:https://www.cnblogs.com/seaspring/p/5536338.html

zookeeper分布式锁的实现

上面讲解已经说明了zookeeper主要是通过节点znode的变化来控制全局锁的,下面我用代码具体呈现。

代码语言:javascript
复制
public class ZookeeperClient {

    private String zkAddr;
    private int timeOut;
    private String authSchema;
    private String authInfo;
    private CuratorFramework client;


    public ZookeeperClient(String zkAddr, int timeOut, String namespace) throws Exception {

        this(zkAddr, timeOut, namespace, null);

    }

    /**
     * 获取zk 连接客户端
     *
     * @param zkAddr    zk地址 ip:port,ip:port,ip:port
     * @param timeOut   连接超时ms
     * @param namespace 所有的操作都是在 /namespace 下的节点操作
     * @param acl       Access Control List(访问控制列表)。Znode被创建时带有一个ACL列表<br>
     *                  acl 主要由三个维度:schema,id,permision 控制节点权限 <br>
     *                  eg:<br>
     *                  Id id = new Id("digest", DigestAuthenticationProvider.generateDigest("username:password"));<br>
     *                  ACL acl = new ACL(ZooDefs.Perms.ALL, id); <br>
     *                  <br>
     *                  维度 schema: <br>
     *                  1:digest 用户名+密码验证 它对应的维度id=username:BASE64(SHA1(password))<br>
     *                  2:host 客户端主机名hostname验证 <br>
     *                  3:ip 它对应的维度id=客户机的IP地址,设置的时候可以设置一个ip段,比如ip:192.168.1.0/16, 表示匹配前16个bit的IP段<br>
     *                  4:auth 使用sessionID验证 <br>
     *                  5:world 无验证,默认是无任何权限  它下面只有一个id, 叫anyone  <br>
     *                  6:super: 在这种scheme情况下,对应的id拥有超级权限,可以做任何事情(cdrwa)  <br>
     *                  7:sasl: sasl的对应的id,是一个通过了kerberos认证的用户id  <br>
     *                  <br>
     *                  维度:permision <br>
     *                  ZooDefs.Perms.READ 读权限<br>
     *                  ZooDefs.Perms.WRITE 写权限<br>
     *                  ZooDefs.Perms.CREATE 创建节点权限<br>
     *                  ZooDefs.Perms.DELETE 删除节点权限<br>
     *                  ZooDefs.Perms.ADMIN 能设置权限<br>
     *                  ZooDefs.Perms.ALL 所有权限<br>
     *                  ALL = READ | WRITE | CREATE | DELETE | ADMIN<br>
     * @throws Exception
     */

    public ZookeeperClient(String zkAddr, int timeOut, String namespace, ACL acl) throws Exception {
        this.zkAddr = zkAddr;
        if (timeOut > 0) {
            this.timeOut = timeOut;
        }
        if (null != acl) {
            this.authSchema = acl.getId().getScheme();
            this.authInfo = acl.getId().getId();
        }
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory
                .builder().connectString(this.zkAddr).namespace(StringUtils.isEmpty(namespace) ? "" : namespace)
                .connectionTimeoutMs(this.timeOut)
                .retryPolicy(new RetryNTimes(5, 10));
        if ((!StringUtils.isBlank(this.authSchema))
                && (!StringUtils.isBlank(this.authInfo))) {
            builder.authorization(this.authSchema, this.authInfo.getBytes());
        }
        System.out.println("namespace:"+namespace);
        this.client = builder.build();
        this.client.start();
        this.client.blockUntilConnected(5, TimeUnit.SECONDS);

    }


    /**
     * 创建一个所有权限节点即schema:world;id:annyone;permision:ZooDefs.Perms.ALL
     *
     * @param nodePath   创建的结点路径
     * @param data       节点数据
     * @param createMode 节点模式
     * @param recursion  当父目录不存在是否创建 true:创建,fasle:不创建
     * @throws Exception
     */
    public void createNode(String nodePath, String data, CreateMode createMode, boolean recursion)
            throws Exception {

        createNode(nodePath, ZooDefs.Ids.OPEN_ACL_UNSAFE, data, createMode, recursion);
    }


    /**
     * 创建节点
     *
     * @param nodePath   创建节点的路径
     * @param acls       节点控制权限列表
     * @param data       节点存放的数据
     * @param createMode 创建节点的模式
     * @param recursion  当父目录不存在是否创建 true:创建,fasle:不创建
     *                   节点模式CreateMode<br>
     *                   1:CreateMode.EPHEMERAL 创建临时节点;该节点在客户端掉线的时候被删除<br>
     *                   2:CreateMode.EPHEMERAL_SEQUENTIAL  临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点(可做分布式锁)<br>
     *                   3:CreateMode.PERSISTENT 持久化目录节点,存储的数据不会丢失。<br>
     *                   4:CreateMode.PERSISTENT_SEQUENTIAL  顺序自动编号的持久化目录节点,存储的数据不会丢失,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名<br>
     * @throws Exception
     */
    public void createNode(String nodePath, List<ACL> acls, String data,
                           CreateMode createMode, boolean recursion) throws Exception {
        byte[] bytes = null;
        if (!StringUtils.isBlank(data)) {
            bytes = data.getBytes("UTF-8");
        }
        createNode(nodePath, acls, bytes, createMode, recursion);
    }

    /**
     * @param nodePath   创建节点的路径
     * @param acls       节点控制权限列表
     * @param data       节点存放的数据
     * @param createMode 创建节点的模式
     * @param recursion  当父目录不存在是否创建 true:创建,fasle:不创建
     *                   节点模式CreateMode<br>
     *                   1:CreateMode.EPHEMERAL 创建临时节点;该节点在客户端掉线的时候被删除<br>
     *                   2:CreateMode.EPHEMERAL_SEQUENTIAL  临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点(可做分布式锁)<br>
     *                   3:CreateMode.PERSISTENT 持久化目录节点,存储的数据不会丢失。<br>
     *                   4:CreateMode.PERSISTENT_SEQUENTIAL  顺序自动编号的持久化目录节点,存储的数据不会丢失,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名<br>
     * @throws Exception
     */
    public void createNode(String nodePath, List<ACL> acls, byte[] data,
                           CreateMode createMode, boolean recursion) throws Exception {
        if (recursion) {
            ((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client
                    .create().creatingParentsIfNeeded().withMode(createMode))
                    .withACL(acls)).forPath(nodePath, data);
        } else {
            ((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client
                    .create().withMode(createMode))
                    .withACL(acls)).forPath(nodePath, data);
        }
    }

    /**
     * 创建一个所有权限的永久节点
     *
     * @param nodePath
     * @param data
     * @param recursion 当父目录不存在是否创建 true:创建,fasle:不创建
     * @throws Exception
     */
    public void createPersitentNode(String nodePath, String data, boolean recursion) throws Exception {

        createNode(nodePath, data, CreateMode.PERSISTENT, recursion);
    }

    /**
     * 创建一个所有权限的零时节点
     *
     * @param nodePath
     * @param data
     * @param recursion 当父目录不存在是否创建 true:创建,fasle:不创建
     * @throws Exception
     */
    public void createEphemeralNode(String nodePath, String data, boolean recursion) throws Exception {

        createNode(nodePath, data, CreateMode.EPHEMERAL, recursion);
    }

    /**
     * 创建一个带权限的永久节点
     *
     * @param nodePath
     * @param data
     * @param recursion 当父目录不存在是否创建 true:创建,fasle:不创建
     * @throws Exception
     */
    public void createPersitentNodeWithAcl(String nodePath, String data, List<ACL> acls, boolean recursion) throws Exception {

        createNode(nodePath, acls, data, CreateMode.PERSISTENT, recursion);
    }

    /**
     * 创建一个带权限的临时节点
     *
     * @param nodePath
     * @param data
     * @param recursion 当父目录不存在是否创建 true:创建,fasle:不创建
     * @throws Exception
     */
    public void createEphemeralNodeAcl(String nodePath, String data, List<ACL> acls, boolean recursion) throws Exception {

        createNode(nodePath, acls, data, CreateMode.EPHEMERAL, recursion);
    }


    /**
     * 创建序列节点且当父节点不存在时创建父节点
     *
     * @param nodePath
     * @param acls       可参考:ZooDefs.Ids
     * @param createMode
     * @param recursion  当父目录不存在是否创建 true:创建,fasle:不创建
     * @throws Exception
     */
    public void createSeqNode(String nodePath, List<ACL> acls, CreateMode createMode, boolean recursion) throws Exception {
        if (recursion) {
            ((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client
                    .create().creatingParentsIfNeeded()
                    .withMode(createMode))
                    .withACL(acls)).forPath(nodePath);
        } else {
            ((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client
                    .create()
                    .withMode(createMode))
                    .withACL(acls)).forPath(nodePath);
        }
    }

    /**
     * 存在返回 节点stat 信息;否则返回null
     *
     * @param path
     * @return
     * @throws Exception
     */
    public Stat exists(String path) throws Exception {

        return this.client.checkExists().forPath(path);
    }

    /**
     * 判断节点是否存在,存在则注册节点监视器
     *
     * @param path
     * @param watcher
     * @return
     */
    public boolean exists(String path, Watcher watcher) throws Exception {

        if (null != watcher) {
            return null != ((BackgroundPathable<?>) this.client.checkExists().usingWatcher(watcher)).forPath(path);
        }
        return null != this.client.checkExists().forPath(path);
    }

    /**
     * 判断是否处于连接状态
     *
     * @return
     */
    public boolean isConnected() {

        if ((null == this.client)
                || (!CuratorFrameworkState.STARTED.equals(this.client
                .getState()))) {
            return false;
        }
        return true;
    }

    public void retryConnection() {
        this.client.start();
    }

    /**
     * 获取连接客户端
     *
     * @return
     */
    public CuratorFramework getInnerClient() {

        return this.client;

    }

    /**
     * 关闭连接
     */
    public void quit() {

        if ((null != this.client)
                && (CuratorFrameworkState.STARTED
                .equals(this.client.getState()))) {
            this.client.close();
        }
    }


    /**
     * 删除节点
     *
     * @param path
     * @param deleChildren
     * @throws Exception
     */
    public void deleteNode(String path, boolean deleChildren) throws Exception {

        if (deleChildren) {
            this.client.delete().guaranteed().deletingChildrenIfNeeded()
                    .forPath(path);
        } else {
            this.client.delete().forPath(path);
        }
    }

    /**
     * 设置节点数据
     *
     * @param nodePath
     * @param data
     * @throws Exception
     */
    public void setNodeData(String nodePath, String data) throws Exception {

        byte[] bytes = null;
        if (!StringUtils.isBlank(data)) {
            bytes = data.getBytes("UTF-8");
        }
        setNodeData(nodePath, bytes);
    }

    /**
     * 设置节点数据
     *
     * @param nodePath
     * @param data
     * @throws Exception
     */
    public void setNodeData(String nodePath, byte[] data) throws Exception {
        this.client.setData().forPath(nodePath, data);
    }

    public String getNodeData(String nodePath, boolean watch) throws Exception {
        byte[] data;
        if (watch) {
            data = (byte[]) ((BackgroundPathable<?>) this.client.getData()
                    .watched()).forPath(nodePath);
        } else {
            data = (byte[]) this.client.getData().forPath(nodePath);
        }
        if ((null == data) || (data.length <= 0)) {
            return null;
        }
        return new String(data, "UTF-8");
    }

    public String getNodeData(String nodePath) throws Exception {
        return getNodeData(nodePath, false);
    }

    public String getNodeData(String nodePath, Watcher watcher)
            throws Exception {
        byte[] data = getNodeBytes(nodePath, watcher);
        return new String(data, "UTF-8");
    }

    public byte[] getNodeBytes(String nodePath, Watcher watcher)
            throws Exception {
        byte[] bytes = null;
        if (null != watcher) {
            bytes = (byte[]) ((BackgroundPathable<?>) this.client.getData()
                    .usingWatcher(watcher)).forPath(nodePath);
        } else {
            bytes = (byte[]) this.client.getData().forPath(nodePath);
        }
        return bytes;
    }

    public byte[] getNodeBytes(String nodePath) throws Exception {
        return getNodeBytes(nodePath, null);
    }


    @SuppressWarnings("unchecked")
    public List<String> getChildren(String nodePath, Watcher watcher)
            throws Exception {
        return (List<String>) ((BackgroundPathable<?>) this.client
                .getChildren().usingWatcher(watcher)).forPath(nodePath);
    }

    public List<String> getChildren(String path) throws Exception {
        return (List<String>) this.client.getChildren().forPath(path);
    }

    @SuppressWarnings("unchecked")
    public List<String> getChildren(String path, boolean watcher)
            throws Exception {
        if (watcher) {
            return (List<String>) ((BackgroundPathable<?>) this.client
                    .getChildren().watched()).forPath(path);
        }
        return (List<String>) this.client.getChildren().forPath(path);
    }


    public ZookeeperClient addAuth(String authSchema, String authInfo)
            throws Exception {
        synchronized (ZookeeperClient.class) {
            this.client.getZookeeperClient().getZooKeeper()
                    .addAuthInfo(authSchema, authInfo.getBytes());
        }
        return this;
    }

    /**
     * 分布式锁
     *
     * @param lockPath
     * @return
     */
    public InterProcessLock getInterProcessLock(String lockPath) {
        return new InterProcessMutex(this.client, lockPath);
    }
}

/**
*工厂类,保证单例
*/
public class ZkClientUtils {
    private static ZookeeperClient zkClient;
    private static int timeOut=30;
    public static ZookeeperClient getZkClient(String zkAddr,String namespace,ACL acl) throws Exception{
        if(zkClient!=null){
            return zkClient;
        }
        synchronized (ZkClientUtils.class) {
            if(null != zkClient){
                return zkClient;
            }
            zkClient = new ZookeeperClient(zkAddr,timeOut, namespace, acl);
        }
        return zkClient;
    }
  }

上面这段代码是zookeeper的客户端以及所有分布式锁操作的方法实现,下面的代码是如何利用分布式锁来控制分布式事务。

代码语言:javascript
复制
//创建一个永久节点作为全局锁
public static void testCreateNode(){
       try {
           ZookeeperClient zkClient = ZkClientUtils.getZkClient(address,"ns", null);
           zkClient.createPersitentNode("/test/qaz/t2/t/t", "data", true);
       } catch (Exception e) {

           e.printStackTrace();
       }
   }
//测试分布式锁
   public static void testDistributeLock(){

       for(int i=0;i<50;i++){
           new Thread(){
               @Override
               public void run() {
                   InterProcessLock lock = null;
                   try{
                       ZookeeperClient zkClient = ZkClientUtils.getZkClient(address,"dislock", null);
                       lock = zkClient.getInterProcessLock("/distributeLock");
                       System.out.println(Thread.currentThread().getName()+"申请锁");
                       lock.acquire();
                       System.out.println(Thread.currentThread().getName()+"持有锁");
                       Thread.sleep(500);
                   }
                   catch(Exception e){
                       e.printStackTrace();
                   }
                   finally{
                       if(null != lock){
                           try {
                               lock.release();
                               System.out.println(Thread.currentThread().getName()+"释放有锁");
                           } catch (Exception e) {
                               e.printStackTrace();
                           }
                       }
                   }
               }

           }.start();
       }
   }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.09.12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • zookeeper基本概念
  • zookeeper分布式锁的实现
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档