zookeeper学习系列:三、利用zookeeper做选举和锁

之前只理解zk可以做命名,配置服务,现在学习下他怎么用作选举和锁,进一步还可构建master-slave模式的分布式系统。

为什么叫Zoo?“因为要协调的分布式系统是一个动物园”。

ZooKeeper是一个中性化的Service,用于管理配置信息、命名、提供分布式同步,还能组合Service。所有这些种类的Service都会在分布式应用程序中使用到。每次编写这些Service都会涉及大量的修bug和竞争情况。正因为这种编写这些Service有一定难度,所以通常都会忽视它们,这就使得在应用程序有变化时变得难以管理应用程序。即使处理得当,实现这些服务的不同方法也会使得部署应用程序变得难以管理。

下边代码是参考文献的java版本,通过service来协调各个独立的PHP脚本,并让它们同意某个成为Leader(所以称作Leader选举)。当Leader退出(或崩溃)时,worker可检测到并再选出新的leader。通过这种方式即可理解一般的master-slave结构分布式系统是如何实现如何调度的,zk是个好东西。

首先需要了解创建节点的模式:

PERSISTENT:持久化目录节点,这个目录节点存储的数据不会丢失;

PERSISTENT_SEQUENTIAL:顺序自动编号的目录节点,这种目 录节点会根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名;

EPHEMERAL:临时目录节点,一旦创建这个节点的客户端与服务器端口也就是 session 超时,这种节点会被自动删除;

EPHEMERAL_SEQUENTIAL:临时自动编号节点。

临时节点在leader选举、锁服务中起着非常重要的作用。 

一、选举

程序逻辑:

1)首先创建根节点/cluster,并创建自身子节点,以 /cluster/w- 为前缀,使用临时自动编号节点模式创建节点

2)获取/cluster的所有子节点并排序,当发现自身是第一个节点时,则自我选举为leader,否则认定为follower

3)注册监听事件,当/cluster里前一个节点有变动时,回到2)

这样,便实现了自动选举,当有节点在timeout时段后不可用时,自动产生新的leader,也可根据当前节点数进行预警。

package zookeeper;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
 * Created with IntelliJ IDEA.
 *
 * @author guanpu
 *         Date: 14-10-22
 *         Time: 下午5:11
 *         To change this template use File | Settings | File Templates.
 */
public class Worker extends ZooKeeper implements Runnable, Watcher {
    public static final String NODE_NAME = "/cluster";
    public String znode;
    private boolean leader;

    public Worker(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
        super(connectString, sessionTimeout, watcher);
    }

    public boolean register() throws InterruptedException, KeeperException {
        if (this.exists(NODE_NAME, null) == null) {
            this.create(NODE_NAME, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
        znode = this.create(NODE_NAME + "/w-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        znode = znode.replace(NODE_NAME + "/", "");
        String node = watchPrevious();
        if (node.equals(znode)) {
            System.out.println("nobody here ,i am leader");
            leader = true;
        } else {
            System.out.println("i am watching");
        }
        return true;
    }

    private String watchPrevious() throws InterruptedException, KeeperException {
        List<String> works = this.getChildren(NODE_NAME, this);
        Collections.sort(works);
        System.out.println(works);
        int i = 0;
        for (String work : works) {
            if (znode.equals(work)) {
                if (i > 0) {
                   //this.getData(NODE_NAME + "/" + works.get(i - 1), this, null);
                    return works.get(i - 1);
                }
                return works.get(0);
            }
        }
        return "";

    }

    @Override
    public void run() {
        try {
            this.register();
        } catch (InterruptedException e) {
        } catch (KeeperException e) {
        }
        while (true) {
            try {
                if (leader) {
                    System.out.println("leading");
                } else {
                    System.out.println("following");
                }
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
        }
    }

    public static void main(String[] args) {
        try {
            String hostPort = "10.16.73.22,10.16.73.12,10.16.73.13";
            new Thread(new Worker(hostPort, 3000, null)).start();
        } catch (IOException e) {
        }
    }


    @Override
    public void process(WatchedEvent event) {
        String t = String.format("hello event! type=%s, stat=%s, path=%s", event.getType(), event.getState(), event.getPath());
        System.out.println(t);
        System.out.println("hello ,my cluster id is :"+znode);
        String node = "";
        try {
            node = this.watchPrevious();
        } catch (InterruptedException e) {
        } catch (KeeperException e) {
        }

        if (node.equals(znode)) {
            System.out.println("process: nobody here ,i am leader");
            leader = true;
        } else {
            System.out.println("process: i am watching");
        }
    }
}

 启动至少三个终端,模拟Leader崩溃的情形。使用Ctrl+c或其他方法退出第一个脚本。刚开始不会有任何变化,worker可以继续工作。后来,ZooKeeper会发现超时,并选举出新的leader。

php移植到java有两个问题,第一个是watcher注册,第一次父类初始化未完成时不能调用自身作为watcher,会报一次watcher调用空指针。

第二个问题:

 this.getData(NODE_NAME + "/" + works.get(i - 1), this, null);

这个不生效,看方法注释是当改动和移除节点时,触发watcher的process,但实验中并未触发,在java里系统的自动删除并不归类在这两个操作之内?

php版本的是正常的,作为遗留问题。为了程序正常运行,更改为 List<String> works = this.getChildren(NODE_NAME, this);   当子节点有变动时执行process方法。  但这样会导致从众效应,当集群服务器众多且带宽延时较大时候会很明显,leader的状态变化会引起所有follower的变化,follower之一短连,也会导致整个集群去响应这个变化。

二、锁 

加锁:

1)zk调用create()方法创建一个路径格式为"_locknode_/lock-"的节点,类型为sequence和ephemeral,临时节点且顺序编号

2)在创建的锁节点上调用getChildren()方法,以获取锁目录下最小编号节点,且不设置watch

3)如果步骤2获得的节点是步骤1创建的节点,那么客户端获得锁,然后退出操作

4)客户端在锁目录上调用exists()方法,设置watch来监视锁目录下序号相对自己小的连续临时节点的状态

5)监视节点状态发生变化,则跳到步骤2,继续后续操作,直到退出锁竞争。

解锁:

将加锁操作步骤1中创建的临时节点删除即可。

参考文献:

http://anykoro.sinaapp.com/2013/04/05/使用apache-zookeeper分布式部署php应用程序/

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏张戈的专栏

Linux运维工程师:30道面试题整理

前段时间,我在准备面试的时搜到的一套 Linux 运维工程师面试题,感觉比较全面,一直保存在草稿,刚在整理后台时翻了出来,干脆就发出来好了,以备不时之需。 1....

2.3K50
来自专栏我和PYTHON有个约会

Django-REST:002-API接口序列化

在这样的处理风格下,目前更加适合项目开发并且成本较低的一种比较流行的开发模式:项目的前后端分离结构模型的优势更加突出,通过RESTful风格指定的格式定义不同的...

13420
来自专栏测试开发架构之路

JMeter测试工具.jmx文件详解

摘要:了解.jmx文件格式类型,对jmeter二次开发与拓展有很大的帮助,当然也可以利用python对其进行一些处理(生成一些测试用例,对jmx文件进行 ”增删...

34340
来自专栏林德熙的博客

dot net core 使用 IPC 进程通信 原理例子序列化

一般都是使用 WCF 或 remoting 做远程通信,但是 dot net core 不支持 WCF 所以暂时我就只能使用 管道通信。

11920
来自专栏公众号_薛勤的博客

深入理解Spring Boot数据源与连接池原理

在使用Spring Boot数据源之前,我们一般会导入相关依赖。其中数据源核心依赖就是spring‐boot‐starter‐jdbc 如下

2.1K30
来自专栏Java帮帮-微信公众号-技术文章全总结

Web-第三十一天 WebService学习【悟空教程】

简单的网络应用使用单一语言写成,它的唯一外部程序就是它所依赖的数据库。大家想想是不是这样呢?

24240
来自专栏佳爷的后花媛

php基础(一)

static 是静态变量,在局部函数中存在且只初始化一次,使用过后再次使用会使用上次执行的结果; 作为计数,程序内部缓存,单例模式中都有用到。

28020
来自专栏编程软文

redis的使用和安装,redis基础和高级部分

41570
来自专栏java一日一条

Redis 和 Memcached 的区别详解

Redis的作者Salvatore Sanfilippo曾经对这两种基于内存的数据存储系统进行过比较:

7910
来自专栏点滴积累

Jupyter(Python)中无法使用Cache原理分析

前言 最近需要在Jupyter中写一个类库,其中有一个文件实现从数据库中读取空间数据并加载为Feature对象,Feature对象是cartopy封装的geom...

35660

扫码关注云+社区

领取腾讯云代金券