专栏首页码猿技术专栏Zookeeper实现分布式锁

Zookeeper实现分布式锁

导读

  • 真是有人()的地方就有江湖(事务),今天不谈江湖,来撩撩人。
  • 分布式锁的概念、为什么使用分布式锁,想必大家已经很清楚了。前段时间作者写过Redis是如何实现分布式锁,今天这篇文章来谈谈Zookeeper是如何实现分布式锁的。
  • 陈某今天分别从如下几个方面来详细讲讲ZK如何实现分布式锁:
    1. ZK的四种节点
    2. 排它锁的实现
    3. 读写锁的实现
    4. Curator实现分步式锁

ZK的四种节点

  • 持久性节点:节点创建后将会一直存在
  • 临时节点:临时节点的生命周期和当前会话绑定,一旦当前会话断开临时节点也会删除,当然可以主动删除。
  • 持久有序节点:节点创建一直存在,并且zk会自动为节点加上一个自增的后缀作为新的节点名称。
  • 临时有序节点:保留临时节点的特性,并且zk会自动为节点加上一个自增的后缀作为新的节点名称。

排它锁的实现

  • 排他锁的实现相对简单一点,利用了zk的创建节点不能重名的特性。如下图:
  • 根据上图分析大致分为如下步骤:
    1. 尝试获取锁:创建临时节点,zk会保证只有一个客户端创建成功。
    2. 创建临时节点成功,获取锁成功,执行业务逻辑,业务执行完成后删除锁。
    3. 创建临时节点失败,阻塞等待。
    4. 监听删除事件,一旦临时节点删除了,表示互斥操作完成了,可以再次尝试获取锁。
    5. 递归:获取锁的过程是一个递归的操作,获取锁->监听->获取锁
  • 如何避免死锁:创建的是临时节点,当服务宕机会话关闭后临时节点将会被删除,锁自动释放。

代码实现

  • 作者参照JDK锁的实现方式加上模板方法模式的封装,封装接口如下:
/**
 * @Description ZK分布式锁的接口
 * @Author 陈某
 * @Date 2020/4/7 22:52
 */
public interface ZKLock {
    /**
     * 获取锁
     */
    void lock() throws Exception;

    /**
     * 解锁
     */
    void unlock() throws Exception;
}
  • 模板抽象类如下:
/**
 * @Description 排他锁,模板类
 * @Author 陈某
 * @Date 2020/4/7 22:55
 */
public abstract class AbstractZKLockMutex implements ZKLock {

    /**
     * 节点路径
     */
    protected String lockPath;

    /**
     * zk客户端
     */
    protected CuratorFramework zkClient;

    private AbstractZKLockMutex(){}

    public AbstractZKLockMutex(String lockPath,CuratorFramework client){
        this.lockPath=lockPath;
        this.zkClient=client;
    }

    /**
     * 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现
     * @throws Exception
     */
    @Override
    public final void lock() throws Exception {
        //获取锁成功
        if (tryLock()){
            System.out.println(Thread.currentThread().getName()+"获取锁成功");
        }else{  //获取锁失败
            //阻塞一直等待
            waitLock();
            //递归,再次获取锁
            lock();
        }
    }

    /**
     * 尝试获取锁,子类实现
     */
    protected abstract boolean tryLock() ;


    /**
     * 等待获取锁,子类实现
     */
    protected abstract void waitLock() throws Exception;


    /**
     * 解锁:删除节点或者直接断开连接
     */
    @Override
    public  abstract void unlock() throws Exception;
}
  • 排他锁的具体实现类如下:
/**
 * @Description 排他锁的实现类,继承模板类 AbstractZKLockMutex
 * @Author 陈某
 * @Date 2020/4/7 23:23
 */
@Data
public class ZKLockMutex extends AbstractZKLockMutex {

    /**
     * 用于实现线程阻塞
     */
    private CountDownLatch countDownLatch;

    public ZKLockMutex(String lockPath,CuratorFramework zkClient){
        super(lockPath,zkClient);
    }

    /**
     * 尝试获取锁:直接创建一个临时节点,如果这个节点存在创建失败抛出异常,表示已经互斥了,
     * 反之创建成功
     * @throws Exception
     */
    @Override
    protected boolean tryLock()  {
        try {
            zkClient.create()
                    //临时节点
                    .withMode(CreateMode.EPHEMERAL)
                    //权限列表 world:anyone:crdwa
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(lockPath,"lock".getBytes());
            return true;
        }catch (Exception ex){
            return false;
        }
    }


    /**
     * 等待锁,一直阻塞监听
     * @return  成功获取锁返回true,反之返回false
     */
    @Override
    protected void waitLock() throws Exception {
        //监听节点的新增、更新、删除
        final NodeCache nodeCache = new NodeCache(zkClient, lockPath);
        //启动监听
        nodeCache.start();
        ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable();

        //监听器
        NodeCacheListener listener=()-> {
            //节点被删除,此时获取锁
            if (nodeCache.getCurrentData() == null) {
                //countDownLatch不为null,表示节点存在,此时监听到节点删除了,因此-1
                if (countDownLatch != null)
                    countDownLatch.countDown();
            }
        };
        //添加监听器
        listenable.addListener(listener);

        //判断节点是否存在
        Stat stat = zkClient.checkExists().forPath(lockPath);
        //节点存在
        if (stat!=null){
            countDownLatch=new CountDownLatch(1);
            //阻塞主线程,监听
            countDownLatch.await();
        }
        //移除监听器
        listenable.removeListener(listener);
    }

    /**
     * 解锁,直接删除节点
     * @throws Exception
     */
    @Override
    public void unlock() throws Exception {
        zkClient.delete().forPath(lockPath);
    }
}

可重入性排他锁如何设计

  • 可重入的逻辑很简单,在本地保存一个ConcurrentMapkey是当前线程,value是定义的数据,结构如下:
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
  • 重入的伪代码如下:
public boolean tryLock(){
    //判断当前线程是否在threadData保存过
    //存在,直接return true
    //不存在执行获取锁的逻辑
    //获取成功保存在threadData中
}

读写锁的实现

  • 读写锁分为读锁和写锁,区别如下:
    • 读锁允许多个线程同时读数据,但是在读的同时不允许写线程修改。
    • 写锁在获取后,不允许多个线程同时写或者读。
  • 如何实现读写锁?ZK中有一类节点叫临时有序节点,上文有介绍。下面我们来利用临时有序节点来实现读写锁的功能。

读锁的设计

  • 读锁允许多个线程同时进行读,并且在读的同时不允许线程进行写操作,实现原理如下图:
  • 根据上图,获取一个读锁分为以下步骤:
    1. 创建临时有序节点(当前线程拥有的读锁或称作读节点)。
    2. 获取路径下所有的子节点,并进行从小到大排序
    3. 获取当前节点前的临近写节点(写锁)。
    4. 如果不存在的临近写节点,则成功获取读锁。
    5. 如果存在临近写节点,对其监听删除事件。
    6. 一旦监听到删除事件,重复2,3,4,5的步骤(递归)

写锁的设计

  • 线程一旦获取了写锁,不允许其他线程读和写。实现原理如下:
  • 从上图可以看出唯一和写锁不同的就是监听的节点,这里是监听临近节点(读节点或者写节点),读锁只需要监听写节点,步骤如下:
    1. 创建临时有序节点(当前线程拥有的写锁或称作写节点)。
    2. 获取路径下的所有子节点,并进行从小到大排序。
    3. 获取当前节点的临近节点(读节点和写节点)。
    4. 如果不存在临近节点,则成功获取锁。
    5. 如果存在临近节点,对其进行监听删除事件。
    6. 一旦监听到删除事件,重复2,3,4,5的步骤(递归)

如何监听

  • 无论是写锁还是读锁都需要监听前面的节点,不同的是读锁只监听临近的写节点,写锁是监听临近的所有节点,抽象出来看其实是一种链式的监听,如下图:
  • 每一个节点都在监听前面的临近节点,一旦前面一个节点删除了,再从新排序后监听前面的节点,这样递归下去。

代码实现

  • 作者简单的写了读写锁的实现,先造出来再优化,不建议用在生产环境。代码如下:
public class ZKLockRW  {

    /**
     * 节点路径
     */
    protected String lockPath;

    /**
     * zk客户端
     */
    protected CuratorFramework zkClient;

    /**
     * 用于阻塞线程
     */
    private CountDownLatch countDownLatch=new CountDownLatch(1);


    private final static String WRITE_NAME="_W_LOCK";

    private final static String READ_NAME="_R_LOCK";


    public ZKLockRW(String lockPath, CuratorFramework client) {
        this.lockPath=lockPath;
        this.zkClient=client;
    }

    /**
     * 获取锁,如果获取失败一直阻塞
     * @throws Exception
     */
    public void lock() throws Exception {
        //创建节点
        String node = createNode();
        //阻塞等待获取锁
        tryLock(node);
        countDownLatch.await();
    }

    /**
     * 创建临时有序节点
     * @return
     * @throws Exception
     */
    private String createNode() throws Exception {
        //创建临时有序节点
       return zkClient.create()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath(lockPath);
    }

    /**
     * 获取写锁
     * @return
     */
    public  ZKLockRW writeLock(){
        return new ZKLockRW(lockPath+WRITE_NAME,zkClient);
    }

    /**
     * 获取读锁
     * @return
     */
    public  ZKLockRW readLock(){
        return new ZKLockRW(lockPath+READ_NAME,zkClient);
    }

    private void tryLock(String nodePath) throws Exception {
        //获取所有的子节点
        List<String> childPaths = zkClient.getChildren()
                .forPath("/")
                .stream().sorted().map(o->"/"+o).collect(Collectors.toList());


        //第一个节点就是当前的锁,直接获取锁。递归结束的条件
        if (nodePath.equals(childPaths.get(0))){
            countDownLatch.countDown();
            return;
        }

        //1. 读锁:监听最前面的写锁,写锁释放了,自然能够读了
        if (nodePath.contains(READ_NAME)){
            //查找临近的写锁
            String preNode = getNearWriteNode(childPaths, childPaths.indexOf(nodePath));
            if (preNode==null){
                countDownLatch.countDown();
                return;
            }
            NodeCache nodeCache=new NodeCache(zkClient,preNode);
            nodeCache.start();
            ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable();
            listenable.addListener(() -> {
                //节点删除事件
                if (nodeCache.getCurrentData()==null){
                    //继续监听前一个节点
                    String nearWriteNode = getNearWriteNode(childPaths, childPaths.indexOf(preNode));
                    if (nearWriteNode==null){
                        countDownLatch.countDown();
                        return;
                    }
                    tryLock(nearWriteNode);
                }
            });
        }

        //如果是写锁,前面无论是什么锁都不能读,直接循环监听上一个节点即可,直到前面无锁
        if (nodePath.contains(WRITE_NAME)){
            String preNode = childPaths.get(childPaths.indexOf(nodePath) - 1);
            NodeCache nodeCache=new NodeCache(zkClient,preNode);
            nodeCache.start();
            ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable();
            listenable.addListener(() -> {
                //节点删除事件
                if (nodeCache.getCurrentData()==null){
                    //继续监听前一个节点
                    tryLock(childPaths.get(childPaths.indexOf(preNode) - 1<0?0:childPaths.indexOf(preNode) - 1));
                }
            });
        }
    }

    /**
     * 查找临近的写节点
     * @param childPath 全部的子节点
     * @param index 右边界
     * @return
     */
    private String  getNearWriteNode(List<String> childPath,Integer index){
        for (int i = 0; i < index; i++) {
            String node = childPath.get(i);
            if (node.contains(WRITE_NAME))
                return node;

        }
        return null;
    }

}

Curator实现分步式锁

  • Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。
  • Curator在分布式锁方面已经为我们封装好了,大致实现的思路就是按照作者上述的思路实现的。中小型互联网公司还是建议直接使用框架封装好的,毕竟稳定,有些大型的互联公司都是手写的,牛逼啊。
  • 创建一个排他锁很简单,如下:
//arg1:CuratorFramework连接对象,arg2:节点路径
lock=new InterProcessMutex(client,path);
//获取锁
lock.acquire();
//释放锁
lock.release();
  • 更多的API请参照官方文档,不是此篇文章重点。
  • 至此ZK实现分布式锁就介绍完了,如有想要源码的朋友,老规矩,回复关键词分布式锁获取。

一点小福利

  • 对于Zookeeper不太熟悉的朋友,陈某特地花费两天时间总结了ZK的常用知识点,包括ZK常用shell命令、ZK权限控制、Curator的基本操作API。目录如下:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 用ZooKeeper实现分布式锁

    一个很典型的秒杀场景,或者说并发量非常高的场景下,对商品库存的操作,我用一个SpringBoot小项目模拟一下。

    行百里er
  • zookeeper实现分布式锁 顶

    神马是分布式锁呢,就是利用服务器集群的特性,如zookeeper或者mysql,redis对多台分布式服务器的进程进行只允许一台服务器的一个进程来进行一个同步操...

    算法之名
  • 利用zookeeper 实现分布式锁

    利用Curator(流行的 zk 客户端工具包) 的 InterProcessMute 类 import org.apache.curator.framewor...

    技术蓝海
  • 利用Zookeeper实现 - 分布式锁

    许多场景中,数据一致性是一个比较重要的话题,在单机环境中,我们可以通过Java提供的并发API来解决;而在分布式环境(会遇到网络故障、消息重复、消息丢失等各种问...

    JAVA葵花宝典
  • 使用zookeeper实现分布式锁

    在同一个jvm进程中时,可以使用JUC提供的一些锁来解决多个线程竞争同一个共享资源时候的线程安全问题,但是当多个不同机器上的不同jvm进程共同竞争同一个共享资源...

    加多
  • 如何用zookeeper 实现分布式锁

    用户2141593
  • zookeeper 实现分布式锁安全用法

    标签: zookeeper sessionExpire connectionLoss 分布式锁

    王清培
  • 【分布式锁】07-Zookeeper实现分布式锁:Semaphore、读写锁实现原理

    前面已经讲解了Zookeeper可重入锁的实现原理,自己对分布式锁也有了更深的认知。

    一枝花算不算浪漫
  • 分布式进阶__如何用zookeeper 实现分布式锁

            分布式锁主要用于在分布式环境中保护跨进程、跨主机、跨网络的共享资源实现互斥访问,以达到保证数据的一致性。

    矿泉水
  • Redis实现分布式锁与Zookeeper实现分布式锁区别

    在学习过程中,简单的整理了一些redis跟zookeeper实现分布式锁的区别,有需要改正跟补充的地方,希望各位大佬及时指出

    Java周某人
  • 【分布式锁】06-Zookeeper实现分布式锁:可重入锁源码分析

    前面已经讲解了Redis的客户端Redission是怎么实现分布式锁的,大多都深入到源码级别。

    一枝花算不算浪漫
  • 分布式专题|如何使用zookeeper实现分布式锁

    在分布式中,避免不了使用分布式锁,在前面的专题中,我们已经说过使用Redis实现分布式锁,这里我将给大家演示如何使用zookeeper实现分布式锁。

    AI码师
  • 利用Zookeeper实现分布式锁及服务注册中心

    对于Zookeeper的定义以及原理,网上已经有很多的优秀文章对其进行了详细的介绍,所以本文不再进行这方面的阐述。 本文主要介绍一些基本的准备工作以及zooke...

    蓝夏
  • Zookeeper实现分布式锁详细步骤,你一定要知道

    前几天分享了分布式锁的三种实现方案(我们是这样一步一步实现分布式锁的),其中对于zookeeper实现方式,有些朋友说想知道实现的总体流程。那么今天我就来将zo...

    架构师修炼
  • 如何用Zookeeper来实现分布式锁?

    海仔
  • Zookeeper怎么实现分布式锁?

    https://segmentfault.com/a/1190000010895869

    Java技术栈
  • Zookeeper和Redis实现分布式锁

    在分布式系统中,为保证同一时间只有一个客户端可以对共享资源进行操作,需要对共享资源加锁来实现,常见有三种方式:

    用户1278550
  • Java实现Zookeeper分布式锁

    我们常说的锁是单进程多线程锁,在多线程并发编程中,用于线程之间的数据同步,保护共享资源的访问。而分布式锁,指在分布式环境下,保护跨进程、跨主机、跨网络的共享资源...

    chenchenchen
  • 基于zookeeper或redis实现分布式锁

    在分布式系统中,分布式锁是为了解决多实例之间的同步问题。例如master选举,能够获取分布式锁的就是master,获取失败的就是slave。又或者能够获取锁的实...

    用户2937493

扫码关注云+社区

领取腾讯云代金券