前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Zookeeper分布式锁实现(zk怎么实现分布式锁)

Zookeeper分布式锁实现(zk怎么实现分布式锁)

作者头像
全栈程序员站长
发布2022-07-28 18:20:17
1.7K0
发布2022-07-28 18:20:17
举报

大家好,又见面了,我是你们的朋友全栈君。

如题,使用zookeeper实现分布式锁

时隔多日又来水文章了,距离上一篇好像过去很久了,现在回头看看之前写的那些东西,只能称之为“垃圾”。今天分享一个基于zookeeper实现的分布式锁简单案例,此案例仅实现了分布式锁的功能,代码优化等一概不扯。

下面先来聊聊其实现的核心思想:

首先用到zookeeper中的两个重要知识点:1、zookeeper中的节点类型:临时节点、临时有序节点、持久节点、持久有序节点。临时节点跟session关联。2、zookeeper的watch。以上两点就是实现分布式锁的核心点。

1、创建一个节点lock作为锁的根节点,当有线程需要抢锁的时候在该节点下创建一个临时有序节点

2、节点创建成功后,获取当前根节点下的所有孩子节点列表,并将自己阻塞住

3、因为获取到的子节点列表是无序的,所以需要先对子节点进行排序,然后判断自己是不是当前的第一个子节点,如果自己是第一个子节点说明抢到锁可以执行业务代码

4、如果自己不是第一个子节点,获取到自己当前在列表中索引,去监听自己的前一个节点,也就是自己的索引 index -1 (这里的监听前一个节点为核心,如果我们去监听根节点,那么一个节点的删除就需要回调所有的子节点代价太大,所以是监听前一个节点)

5、当获得锁的节点执行释放锁,也就是删除自己的节点时,后边监听的节点收到回调事件后再去获取所有的子节点,再去判断自己是不是第一个,执行抢锁操作

以上几步,便是实现分布式锁的核心思想。下面将实现的代码贴出来。

代码部分

1、pom.xml

代码语言:javascript
复制
 <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.7.0</version>
        </dependency>

2、ZKUtils.java 获取zookeeper实例的工具类

代码语言:javascript
复制
package com.bx.wx.system.zk;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @创建人 z.bx
 * @创建时间 2021/5/16
 */
public class ZKUtils {

    private static ZooKeeper zooKeeper;

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static ZooKeeper getZooKeeper() throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper("ip:2181,ip:2182,ip:2183/testConfig/lock", 3000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                Event.KeeperState state = event.getState();
                switch (state) {
                    case Unknown:
                        break;
                    case Disconnected:
                        break;
                    case NoSyncConnected:
                        break;
                    case SyncConnected:
                        countDownLatch.countDown();
                        break;
                    case AuthFailed:
                        break;
                    case ConnectedReadOnly:
                        break;
                    case SaslAuthenticated:
                        break;
                    case Expired:
                        break;
                    case Closed:
                        break;
                }

            }
        });
        countDownLatch.await();
        return zooKeeper;
    }
}

3、ZKLockUtils.java 实现了分布式锁的工具类

代码语言:javascript
复制
package com.bx.wx.system.zk;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @创建人 z.bx
 * @创建时间 2021/5/21
 */
public class ZKLockUtils implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback, AsyncCallback.Children2Callback, AsyncCallback.StringCallback {

    /**
     * 这里可通过set方法或者构造方法,传入zooKeeper,
     */
    private ZooKeeper zooKeeper;

    /**
     * 当前节点的path
     */
    private String pathName;

    /**
     * 当前线程的名字,便于查看
     */
    private String threadName;

    /**
     * 用于获取不到锁时候阻塞
     */
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * v 1.0
     * 加锁方法
     * 基础版本,功能实现了,后续再进行优化吧
     */
    public  void lock(){
        /**
         * 思路.....
         * 1、在锁目录下创建自己的节点,临时有序节点
         * 2、获取所有的孩子节点、判断自己是不是第一个
         * 3、如果自己是第一个,则加锁成功,执行业务代码
         * 4、如果自己不是第一个,watch自己的前一个节点
         * 5、当第一个节点,也就是获取锁的执行完之后,删除自己的节点
         * 6、第二个就能监听到,从而继续执行获取所有孩子节点,判断自己是不是第一个的操作
         */
        try {
            zooKeeper.create("/lock", "lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,threadName);
            //当前线程阻塞,进行抢锁
            countDownLatch.await();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 解锁方法
     * 执行完业务后,删除掉自己的节点即可 version为-1  忽略数据版本
     */
    public  void ulock(){
        //删除自己的节点
        try {
            zooKeeper.delete(pathName,-1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }


    /**
     * Children2Callback 接口
     * 获取节点下所有孩子
     * 实现分布式锁的核心点
     * @param rc
     * @param path
     * @param ctx
     * @param children
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        if(children != null && children.size()>0){
            //对节点进行排序
            Collections.sort(children);
            String currentPath = pathName.substring(1);
            //查询自己是第几个
            int index = children.indexOf(currentPath);
            //判断自己是不是第一个
            if(index<1){
                try {
                    //如果自己是第一个,则认为抢到了锁
                    System.out.println(threadName+"抢到锁了..");
                    zooKeeper.setData("/",threadName.getBytes(),-1);
                    countDownLatch.countDown();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }else{
                //只监听自己的前一个
                zooKeeper.exists("/"+children.get(index-1),this,this,"abc");
            }
        }
    }

    /**
     * 节点创建成功时的回调
     * @param rc
     * @param path
     * @param ctx
     * @param name
     */
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        pathName = name;
        System.out.println(threadName+"-节点创建成功:"+pathName);
        //处的watch为false,表示不需要对根节点下的所有节点进行watch,我们只需要监听自己的前一个即可
        zooKeeper.getChildren("/",false,this,"abc");
    }

    /**
     * DataCallback接口
     * 当getdata有数据时的回调
     * @param rc
     * @param path
     * @param ctx
     * @param data
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        //TODO
    }

    /**
     * StatCallback接口
     * 判断节点是否存在时的回调
     * @param rc
     * @param path
     * @param ctx
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        //TODO
        /*if(stat != null){
            zooKeeper.getData("/lock",this,this,"abc");
        }*/
    }

    /**
     * Watcher 接口
     * 节点的事件回调
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {
        Event.EventType eventType = event.getType();
        String path = event.getPath();
        switch (eventType) {
            case None:
                break;
            case NodeCreated:
                System.out.println("节点被创建...");
                break;
            case NodeDeleted:
                //当前一个节点被删除,判断自己是不是第一个
                System.out.println(path+"-节点被删除...");
                //执行获取所有孩子节点的操作
                zooKeeper.getChildren("/",false,this,"abc");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }


}

4、ZkLockTest.java 测试类

代码语言:javascript
复制
package com.bx.wx.system.zk;

import org.apache.zookeeper.ZooKeeper;

/**
 * @创建人 z.bx
 * @创建时间 2021/5/21
 */
public class ZkLockTest {

    public static void main(String[] args)throws Exception {
        ZooKeeper zooKeeper = ZKUtils.getZooKeeper();
        //模拟多线程请求
        for (int i = 0; i < 5; i++) {
            String threadName = "LockThread-"+i;
            new Thread(()->{
                ZKLockUtils lockUtils = new ZKLockUtils();
                lockUtils.setZooKeeper(zooKeeper);
                lockUtils.setThreadName(threadName);
                //加锁
                lockUtils.lock();
                System.out.println(Thread.currentThread().getName()+"正在执行任务");
                //模拟执行任务
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //解锁
                lockUtils.ulock();
            },threadName).start();
        }
    }
}

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/128665.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022年4月1,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 如题,使用zookeeper实现分布式锁
  • 代码部分
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档