前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >zookeeper应用:屏障、队列、分布式锁

zookeeper应用:屏障、队列、分布式锁

作者头像
WindWant
发布2020-09-11 10:41:54
4360
发布2020-09-11 10:41:54
举报
文章被收录于专栏:后端码事后端码事

zookeeper工具类:

获取连接实例;创建节点;获取子节点;设置节点数据;获取节点数据;访问控制等。

代码语言:javascript
复制
package org.windwant.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.List;

/**
 * zookeeper util
 */
public class ZookeeperUtil {
    private static final int SESSION_TIMEOUT = 30000;

    /**
     * 使用连接串创建连接
     * @param domain
     * @param w
     * @return
     */
    public static ZooKeeper getInstance(String domain, Watcher w){
        try {
            return new ZooKeeper(domain,SESSION_TIMEOUT, w);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }


    public static String createNode(ZooKeeper zk, String path, byte[] data){
        try {
            return zk.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static List<String> getChildrenNode(ZooKeeper zk, String path){
        try {
            return zk.getChildren(path, false);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static Stat setNodeData(ZooKeeper zk, String path, byte[] data, int version){
        try {
            return zk.setData(path, data, version);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static byte[] getNodeData(ZooKeeper zk, String path){
        try {
            return zk.getData(path, false, null);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void deleteNode(ZooKeeper zk, String path, int version){
        try {
            zk.delete(path, version);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public static void closeZk(ZooKeeper zk){
        try {
            if(zk != null) {
                zk.close();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void addAuth(ZooKeeper zk, String userName, String passwd){
        try {
            zk.addAuthInfo(String.valueOf(Ids.AUTH_IDS), DigestAuthenticationProvider.generateDigest(userName + ":" + passwd).getBytes());
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
//        try {
//            ZooKeeper zk = new ZooKeeper("localhost", 2181, null);
//            addAuth(zk, "roger", "123456");
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
        try {
            System.out.println(DigestAuthenticationProvider.generateDigest("roger:123456"));
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
    }
}

继承父类:

代码语言:javascript
复制
SyncPrimitive

负责zookeeper连接及根节点的初始化。

实现zookeeper的Watcher
代码语言:javascript
复制
package org.windwant.zookeeper;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SyncPrimitive implements Watcher {

    protected ZooKeeper zk = null;
    protected Integer mutex;

    SyncPrimitive(Integer mutex) {
        this.mutex = mutex;
    }

    /**
     * 初始化zookeeper
     * @param domain
     */
    protected void initZK(String domain){
        System.out.println(Thread.currentThread().getName() + ": init zookeeper...");
        try {
            zk = new ZooKeeper(domain, 30000, this);
            System.out.println(Thread.currentThread().getName() + ": zookeeper connected " + zk);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 初始化应用根节点 并发处理
     * @param root
     */
    protected void initZKRootNode(String root){
        //并发控制
        synchronized (mutex) {
            try {
                if (zk != null) {
                    if (zk.exists(root, false) != null) {
                        List<String> child = zk.getChildren(root, false);
                        if (child != null && !child.isEmpty()) {
                            //zookeeper multi操作;或者 Transaction(multi封装) commit操作;
                            List<Op> ops = new ArrayList<>();
                            child.forEach(c -> {
                                ops.add(Op.delete(root + "/" + c, -1));
                            });
                            List<OpResult> opRsts = zk.multi(ops);
                            System.out.println(Thread.currentThread().getName() + ": deleted child node success!");
                        }
                        zk.setData(root, String.valueOf(0).getBytes(), -1);
                        System.out.println(Thread.currentThread().getName() + ": app root node " + root + " init success! ");
                    } else {
                        zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                        System.out.println(Thread.currentThread().getName() + ": app root node " + root + " create success! ");
                    }
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void process(WatchedEvent event) {
        if(event.getState().equals(Event.KeeperState.SyncConnected)) {
        }
    }
}

zookeeper 屏障 Barrier:

代码语言:javascript
复制
SyncPrimitiveBarrier
代码语言:javascript
复制
enter():加入屏障队列
代码语言:javascript
复制
leave():离开屏障队列
代码语言:javascript
复制
package org.windwant.zookeeper;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

public class SyncPrimitiveBarrier extends SyncPrimitive {

    private String root;
    //屏障阈值
    private int size;
    private String name;

    /**
     * Barrier constructor
     *
     * @param domain
     * @param root
     * @param size
     */
    public SyncPrimitiveBarrier(String domain, String root, Integer size) {
        super(size);
        this.root = root;
        this.size = size;

        initZK(domain);
        initZKRootNode(root);

        // My node name
        try {
            name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
        } catch (UnknownHostException e) {
            System.out.println(e.toString());
        }

    }

    /**
     * Join barrier
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */

    boolean enter() throws KeeperException, InterruptedException{
        synchronized (mutex) {
            List<String> list = zk.getChildren(root, true);
            //当前节点数小于阈值,则创建节点,进入barrier
            if (list.size() < size) {
                System.out.println("node: " + list.size());
                this.name = zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL);
                if (list.size() + 1 == size) {
                    System.out.println("set data node size" + list.size());
                    zk.setData(root, String.valueOf(list.size() + 1).getBytes(), -1);
                }
                System.out.println(Thread.currentThread().getName() + ": " + name + " enter barrier!");
                return true;
            } else {
                //否则不进入
                System.out.println(Thread.currentThread().getName() + ": " + name + " barrier full, inaccessible!");
                return false;
            }
        }
    }

    /**
     * Wait until all reach barrier
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */

    boolean leave() throws KeeperException, InterruptedException, UnsupportedEncodingException {
        while (true) {
            int data = Integer.parseInt(new String(zk.getData(root, false, new Stat()), "UTF-8"));
            if (data == size) {
                System.out.println("leave size: " + data);
                //离开
                zk.delete(name, -1);
                System.out.println(Thread.currentThread().getName() + ": " + name + " left barrier!");
                return true;
            } else {
                System.out.println(Thread.currentThread().getName() + ": " + name + " waitting for others!");
                Thread.sleep(1000);//每秒检查一次
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                SyncPrimitiveBarrier syncPrimitiveBarrier = new SyncPrimitiveBarrier("localhost:2181", "/barrier_test", 3);
                boolean flag = false;
                try {
                    //模拟需要到达barrier的时间
                    Thread.sleep(ThreadLocalRandom.current().nextInt(1,5)*1000);
                    flag = syncPrimitiveBarrier.enter();

                    //尝试离开barrier
                    syncPrimitiveBarrier.leave();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }).start();

        }
    }
}

zookeeper 消息队列:

代码语言:javascript
复制
SyncPrimitiveQueue
代码语言:javascript
复制
produce(int i):生成消息放入队列
代码语言:javascript
复制
consume():消费队列消息
代码语言:javascript
复制
package org.windwant.zookeeper;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
 * Producer-Consumer queue
 */
public class SyncPrimitiveQueue extends SyncPrimitive {

    private String root;
    private int queueSize;

    /**
     * Constructor of producer-consumer queue
     *
     * @param domain
     * @param name
     */

    SyncPrimitiveQueue(String domain, String name, Integer queueSize) {
        super(queueSize);
        this.root = name;
        this.queueSize = queueSize;
        initZK(domain);
        initZKRootNode(root);
    }

    /**
     * Add element to the queue.
     *
     * @param i
     * @return
     */

    public boolean produce(int i) throws KeeperException, InterruptedException{
        synchronized (mutex) {
            List<String> children = zk.getChildren(root, false);
            if(children != null && children.size()>=mutex){
                System.out.println(Thread.currentThread().getName() + ": producer queue full, waiting for consuming");
                mutex.wait();
            }
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;

            b.putInt(i);
            value = b.array();
            String node = zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT_SEQUENTIAL);
            System.out.println(Thread.currentThread().getName() + ": produce value: " + node);
            mutex.notifyAll();//通知消费
            return true;
        }
    }


    /**
     * Remove first element from the queue.
     *
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    int consume() throws KeeperException, InterruptedException{
        int retvalue = -1;
        Stat stat = null;

        // Get the first element available
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() == 0) {
                    System.out.println(Thread.currentThread().getName() + ": resource not awailable, waitting for produce!");
                    mutex.wait();
                } else {
                    list.sort((String s1, String s2) -> s1.compareTo(s2)); //消费序号最小的节点
                    String dest = list.get(0);
                    System.out.println(Thread.currentThread().getName() + ": cosume value: " + root + "/" + dest);
                    byte[] b = zk.getData(root + "/" + dest,
                            false, stat);
                    zk.delete(root + "/" + dest, 0); //消费后删除
                    ByteBuffer buffer = ByteBuffer.wrap(b);
                    retvalue = buffer.getInt();
                    mutex.notifyAll();
                    return retvalue;
                }
            }
        }
    }

    public static void main(String[] args) {
        SyncPrimitiveQueue syncPrimitiveQueue = new SyncPrimitiveQueue("localhost:2181", "/queue_test", 10);
        //生产 每隔三秒 模拟慢生产
        new Thread(() -> {
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                try {
                    syncPrimitiveQueue.produce(i);
                    Thread.sleep(ThreadLocalRandom.current().nextInt(0, 3)*1000);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //消费 每隔一秒 模拟快消费
        new Thread(() -> {
            for (int i = 0; i < Integer.MAX_VALUE ; i++) {
                try {
                    syncPrimitiveQueue.consume();
                    Thread.sleep(ThreadLocalRandom.current().nextInt(0, 3)*1000);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

zookeeper分布式锁:

代码语言:javascript
复制
SynZookeeperLock
代码语言:javascript
复制
getInstance(String domain):获取zookeeper实例
代码语言:javascript
复制
tryLock(String domain, String path, byte[] data, CountDownLatch c):尝试获取分布式锁。
代码语言:javascript
复制
package org.windwant.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.ZooDefs.Ids;

import java.io.IOException;
import java.util.concurrent.*;

/**
 * zookeeper 分布式锁
 */
public class SynZookeeperLock {
    private static final int SESSION_TIMEOUT = 30000;

    public static ZooKeeper getInstance(String domain){
        try {
            CountDownLatch c = new CountDownLatch(1);
            ZooKeeper zk = new ZooKeeper(domain, SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                        c.countDown(); // 唤醒当前正在执行的线程
                    }
                }
            });
            //阻塞直到连接完成
            c.await();
            return zk;
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     * 获取分布式锁
     * 使用临时节点,避免进程获取锁后,down掉未释放锁问题
     * @param domain
     * @param path
     * @param data
     * @param c
     */
    public static void tryLock(String domain, String path, byte[] data, CountDownLatch c){
        //每次获取锁使用新的会话连接
        ZooKeeper zk = getInstance(domain);
        zk.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path1, ctx, name) -> {
            //节点创建成功,获取锁
            if (rc == 0) {
                System.out.println(Thread.currentThread().getName() + ":result " + rc + " lock " + path + ", created!");
                try {
                    //模拟获取锁后3秒释放
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName() + ":task complete,try release lock!");
                    zk.delete(path, -1, (rc1, path2, ctx1) -> {
                        if(rc1 == 0){
                            System.out.println(Thread.currentThread().getName() + ":lock released!");
                        }
                    }, null);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    //释放等待
                    c.countDown();
                }
            } else if(rc == -110) {//节点已存在,则说明锁已被其它进程获取,则创建watch,并阻塞等待
                System.out.println(Thread.currentThread().getName() + ":result " + rc + " lock " + path + " already created, waiting!");
                try {
                    zk.exists(path, event -> {
                        //watch 到锁删除事件,则触发重新获取锁
                        if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
                            System.out.println(Thread.currentThread().getName() + ":get node deleted event! try lock!");
                            //释放连接,避免服务器因为连接数限制
                            try {
                                zk.close();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            SynZookeeperLock.tryLock(domain, path, data, c);
                            c.countDown();
                        }
                    });
                } catch (KeeperException e) {
                    //包括ConnectionLossException(网络,服务器故障) 需要确认客户端重连执行情况 之前的请求是否需要重新执行
                    e.printStackTrace();
                    c.countDown();
                } catch (InterruptedException e) {
                    //线程中断,打断请求
                    e.printStackTrace();
                    c.countDown();
                }
            }else {
                //-4 -112
                System.out.println(Thread.currentThread().getName() + ": connection lose or session invalid");
                c.countDown();
//                tryLock(domain, path, data, c);
            }
        }, new Object());
        try {
            //阻塞等待结果
            c.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            c.countDown();
        }
    }

    public static void main(String[] args) {
        String lockPath = "/testlock";
        byte[] lock = "lock".getBytes();
        String domain = "127.0.0.1:2181";
        //测试获取锁线程 注意服务器最大连接数限制
        for (int i = 0; i < 20; i++) {
            Thread tmp = new Thread( () -> tryLock(domain, lockPath, lock, new CountDownLatch(1)));
            tmp.start();
        }
    }
}

项目地址:https://github.com/windwant/windwant-demo/tree/master/zookeeper-service

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-10-20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档