专栏首页Liusy01ZK实现分布式锁

ZK实现分布式锁

上一篇说了ZK是什么以及能干什么,今儿这篇就来用ZK实现分布式锁,分别用java原生的zookeeper客户端、ZKClient实现。

一、分布式锁

分布式锁的思路是每个客户端都在某个目录下注册一个临时有序节点,每次最小的节点会获取锁,当前节点会去监听上一个较小节点,如果较小节点失效之后,就会去获取锁。

java原生zookeeper客户端

(1)引入jar包

(2)创建ZK客户端连接单例

public class ZookeeperClient {
    //zk集群地址
    public static final String ZOOKEEPER_CONNECT="192.168.197.100:2181,192.168.197.110:2181,192.168.197.120:2181";
    //计数器,用于等待连接成功
    public static CountDownLatch countDownLatch = new CountDownLatch(1);
    //连接超时时间 
    public static final int SESSION_TIMEOUT = 5000;
    //用volatile修饰单例,防止赋值时发生指令重排
    private volatile static ZooKeeper instance;
    //用Double check获取单例
    public static ZooKeeper getInstance() throws IOException, InterruptedException {
        if (instance == null ){
            synchronized (ZookeeperClient.class) {
                if (instance == null) {
                    //连接时注册一个监听,监听连接状态变化
                    instance = new ZooKeeper(ZOOKEEPER_CONNECT, SESSION_TIMEOUT, new Watcher() {
                        //监听回调方法
                        @Override
                        public void process(WatchedEvent watchedEvent) {
                            //当连接状态变成connected,就说明连接成功
                            if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                                countDownLatch.countDown();
                            }
                        }
                    });
                    //等待连接成功
                    countDownLatch.await();
                }
            }
        }
        return instance;
    }

    public static int getSessionTimeout() {
        return SESSION_TIMEOUT;
    }
}

上述代码中的CountDownLatch是因为连接时会耗时较长,所以需要添加一个计数器进行阻塞,否则会在connecting阶段就被释放了。

(3)创建分布式锁客户端

public class DistibutedLock {
    //根目录,客户端都会去此目录下创建临时有序子节点
    private final String ROOT_PATH = "/lock";
    //客户端
    private ZooKeeper zookeeper;
    //session超时时间
    private  int SESSION_TIMEOUT;
    //当前客户端创建有序节点的名称 
    private String lockId;

    private CountDownLatch countDownLatch = new CountDownLatch(1);
    
    public DistibutedLock() throws IOException, InterruptedException {
        this.zookeeper =ZookeeperClient.getInstance();
        this.SESSION_TIMEOUT = ZookeeperClient.getSessionTimeout();
    }

    public boolean lock(){

        try {
            //创建临时有序子节点
            lockId = zookeeper.create(ROOT_PATH+"/","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            System.out.println(Thread.currentThread().getName()+"创建节点"+lockId+",开始竞争锁");
            //获取/lock目录下所有子节点
            List<String> children = zookeeper.getChildren(ROOT_PATH, true);
            //用SortedSet对子节点从小到大进行排序
            SortedSet<String> sortedSet = new TreeSet<String>();
            for (String child : children) {
                sortedSet.add(ROOT_PATH+"/"+child);
            }
            //获取最小节点名称
            String first = sortedSet.first();
            //如果当前创建节点就是最小节点,则获取锁
            if (first.equals(lockId)) {
                System.out.println(Thread.currentThread().getName()+"获取锁"+lockId);
                return true;
            }
            //获取比当前id小的节点集合
            SortedSet<String> frontSet = sortedSet.headSet(lockId);
            if (!frontSet.isEmpty()) {
                //取集合中最后一个元素,也就是临近最小节点
                String last = frontSet.last();
                System.out.println(lockId+"监听"+last);
                //当前节点去监听上一个节点,当上一个节点被删除的时候
                //当前节点就可以获取锁 
                zookeeper.exists(last, new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
                            countDownLatch.countDown();
                        }
                    }
                });
                countDownLatch.await(SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
                System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
            }
            return true;

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return false;
    }

    //释放锁 
    public boolean unLock(){
        try {
            System.out.println(Thread.currentThread().getName() + "开始删除锁" + lockId);
            //删除当前节点
            zookeeper.delete(lockId, -1);
            return true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        return false;
    }
}

(4)测试代码

//等待器,当所有线程都执行到某个步骤才停止阻塞
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
//模拟十个线程去获取锁
for (int i = 0; i < 10; i++) {
    new Thread(()-> {
        DistibutedLock lock = null;
        try {
            lock = new DistibutedLock();
            cyclicBarrier.await();
            lock.lock();
            TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } finally {
            if(lock!=null){
                lock.unLock();
            }
        }
    }).start();
}

运行结果:按照创建顺序去获取锁

ZKClient

(1)引入jar包

(2)创建ZK客户端连接单例

public class ZKClientInstance {

    public static final String ZOOKEEPER_CONNECT="192.168.197.100:2181,192.168.197.110:2181,192.168.197.120:2181";

    private volatile static ZkClient instance;
    
    public static ZkClient getInstance(){
        if (instance == null) {
            synchronized (ZKClientInstance.class) {
                if (instance == null) {
                    instance = new ZkClient(ZOOKEEPER_CONNECT,5000,
                            5000,new SerializableSerializer());
                }
            }
        }
        return instance;
    }

}

(3)创建分布式锁客户端

public class ZKClientDisLock {

    private static final String ROOT_PATH = "/lock";

    private ZkClient zkClient;

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    private String lockId;

    public ZKClientDisLock(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    public boolean lock(){


        lockId = zkClient.createEphemeralSequential(ROOT_PATH + "/", "123");

        List<String> children = zkClient.getChildren(ROOT_PATH);
        SortedSet<String> sortedSet = new TreeSet<String>();

        for (String child : children) {
            sortedSet.add(ROOT_PATH+"/"+child);
        }
        String first = sortedSet.first();
        if (first.equals(lockId)) {
            System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
            return true;
        }

        SortedSet<String> frontSet = sortedSet.headSet(lockId);
        if (null != frontSet && frontSet.size() > 0) {
            String last = frontSet.last();
            IZkDataListener iZkDataListener = null;
            try {
                System.out.println(lockId + "监听" + last + "节点变化");
                iZkDataListener = new IZkDataListener() {
                    @Override
                    public void handleDataChange(String s, Object o) throws Exception {
                    }

                    @Override
                    public void handleDataDeleted(String s) throws Exception {
                        countDownLatch.countDown();
                    }
                };
                zkClient.subscribeDataChanges(last, iZkDataListener);
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
            } catch (Exception e) {

            }finally {
                zkClient.unsubscribeDataChanges(last,iZkDataListener);
            }
            return true;

        }

        return false;
    }

    public void unLock(){
        System.out.println(Thread.currentThread().getName()+ "释放锁"+ lockId + "-----");
        zkClient.delete(lockId);
    }
}

(3)测试代码

CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
  for (int i = 0; i < 10; i++) {
      int finalI = i;
      new Thread(()-> {
          ZKClientDisLock lock = null;
          try {
              lock = new ZKClientDisLock(ZKClientInstance.getInstance());
              cyclicBarrier.await();
              lock.lock();
              TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));
          }  catch (InterruptedException e) {
              e.printStackTrace();
          } catch (BrokenBarrierException e) {
              e.printStackTrace();
          } finally {
              if(lock!=null){
                  lock.unLock();
              }
          }
      }).start();
  }

运行结果:

上述就是用java原生api以及ZKClient实现的分布式锁。

还有一种是用apache-curator实现,其可以实现可重入锁、排它锁、读写锁。之后有机会介绍curator的使用方法。

本文分享自微信公众号 - Liusy01(Liusy_01),作者:Liusy01

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-04-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 初识Netty

    【导读】我在两年前的时候就购买了《Netty权威指南》看了一下,不过没看懂哈哈哈哈,工作中也用不到,很快就忘了,直到前段时间在dy那边需要我重构一个TCP连接通...

    Liusy
  • Netty之HTTP协议应用开发

    HTTP(超文本传输协议)是建立在TCP传输协议上的应用层协议,是一个属于应用层的面向对象的协议,也是目前Web开发主流的协议。

    Liusy
  • Java锁机制

    上一篇简略说了一下Synchronized和Lock,今天就来说一下Java的锁机制。

    Liusy
  • SpringMVC请求参数和响应结果全局加密和解密

    前段时间在做一个对外的网关项目,涉及到加密和解密模块,这里详细分析解决方案和适用的场景。为了模拟真实的交互场景,先定制一下整个交互流程。第三方传输(包括请求和响...

    Throwable
  • 13.json解析

    六月的雨
  • Java IO

    java中涉及到的io流基本都是从以上四个抽象基类派生出来的,其子类都是以其父类的名字做后缀。

    万能青年
  • Java企业微信开发_07_JSSDK多图上传

     所有的JS接口只能在企业微信应用的可信域名下调用(包括子域名),可在企业微信的管理后台“我的应用”里设置应用可信域名。这个域名必须要通过ICP备案,不然jss...

    shirayner
  • 第八节:详细讲解Java中的异常处理情况与I/O流的介绍以及类集合框架

    大家好,我是 Vic,今天给大家带来详细讲解Java中的异常处理情况与I/O流的介绍以及类集合框架的概述,希望你们喜欢

    达达前端
  • 多人聊天室

    最近学完网络线程协议 ,因此写了一个用java编写的聊天室 话不多说 效果如图 ? 首先 创建服务器端 package com.yc.server...

    汤高
  • Java日常开发的21个坑,你踩过几个?

    最近看了极客时间的《Java业务开发常见错误100例》,再结合平时踩的一些代码坑,写写总结,希望对大家有帮助,感谢阅读~

    捡田螺的小男孩

扫码关注云+社区

领取腾讯云代金券