使用Node.js实现一个简单的ZooKeeper客户端

什么是ZooKeeper

Zookeeper 是一个分布式的、开源的协调服务,用在分布式应用程序中。它提出了一组简单的原语,分布式应用程序可以基于这些原语之上构建更高层的分布式服务用于实现同步、配置管理、分组和命名等。Zookeeper 设计的容易进行编程,它使用一种类似于文件系统的目录树结构的数据模型,以 java 方式运行,有 java 和 c 的绑定(binding)。

分布式系统中的协调服务总所周知地难于正确实现,尤其容易产生诸如争用条件 (race conditions)、死锁(deadlock) 等错误。Zookeeper 背后的动机就是减轻分布式应用程序从头做起实现协调服务的难度。

数据模型

Zookeeper 会维护一个具有层次关系的数据结构,它非常类似于一个标准的文件系统,如下图所示:

Zookeeper 这种数据结构有如下这些特点:

  1. 每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1
  2. znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录
  3. znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
  4. znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了
  5. znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2
  6. znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的。

简洁的API Zookeeper 的设计目标之一就是提供简单的编程接口。于是,它只提供了以下的操作:

  • create : 在(命名空间)树的一个特定地址上创建一个节点
  • delete : 删除一个节点
  • exists : 检测在一个地址上是否存在节点
  • get data : 从节点读取数据
  • set data :将数据写入节点
  • get children :检索子节点列表
  • sync : 等待数据传播完成

谁在用?

如小米公司的米聊,其后台就采用了ZooKeeper作为分布式服务的统一协作系统。而阿里公司的开发人员也广泛使用ZooKeeper,并对其进行了适当修改,开源了一款TaoKeeper软件,以适应自身业务需要。另外还包括Apache HBase、Apache Kafka、Facebook Message等产品也都使用了ZooKeeper。

应用场景

  • 数据量比较小,但对数据可靠性要求很高的场景,比如管理分布式应用的协作数据。不能做什么
  • ZooKeeper不合适做海量存储,因为它主要用来管理分布式应用协作的关键数据。对于海量数据,不同的应用有不同的需求,如对一致性和持久性的不同需求,所以在设计应用时,最佳实践应该将应用数据和协作数据分开,况且对于海量数据我们的选择很多,如数据库或者分布式文件系统等。
  • 不要让ZooKeeper Server来管理应用程序的缓存,而应该把这些任务交给ZooKeeper客户端,因为这样会导致ZooKeeper的设计更加复杂。比如,让ZooKeeper来管理缓存失效,可能会导致ZooKeeper在运行时,停滞在等待客户端确认一个缓存失效的请求上,因为在进行所有写操作之前,都需要确认对应的缓存数据是否失效。

Node.js应用与ZooKeeper Server进行通信

那么当Node.js应用作为整个异构分布式系统中的一环,需要作为客户端去操作ZooKeeper Server上的znode时,应该如何实现? 说实话,上文介绍了这么多ZooKeeper的原理,其实作为客户端只需要单纯的把znode作为文件来操作就好,并且可以监听znode的改变,十分方便。本文只描述怎样使用Node.js实现ZooKeeper客户端角色。

node-zookeeper

node-zookeeper是ZooKeeper的一个Node.js客户端实现,这个模块是基于ZooKeeper原生提供的C API来实现的。

下载 npm install zookeeper

栗子

var ZooKeeper = require ("zookeeper");
var zk = new ZooKeeper({
  connect: "localhost:8888" // zk server的服务器地址和监听的端口号,timeout: 200000 // 以毫秒为单位
 ,debug_level: ZooKeeper.ZOO_LOG_LEVEL_WARN 
 ,host_order_deterministic: false 
});
zk.connect(function (err) {
    if(err) throw err;
    console.log ("zk session established, id=%s", zk.client_id);
    zk.a_create ("/node.js1", "some value", ZooKeeper.ZOO_SEQUENCE | ZooKeeper.ZOO_EPHEMERAL, function (rc, error, path)  {
        if (rc != 0) {
            console.log ("zk node create result: %d, error: '%s', path=%s", rc, error, path);
        } else {
            console.log ("created zk node %s", path);
            process.nextTick(function () {
                zk.close ();
            });
        }
    });
});

其中:

  • connect: 包含主机名和ZooKeeper服务器的端口。
  • timeout:以毫秒为单位,表示ZooKeeper等待客户端通信的最长时间,之后会声明会话已死亡。ZooKeeper的会话一般设置超时时间5-10秒。
  • debug_level:设置日志的输出级别,有四种级别:ZOO_LOG_LEVEL_ERROR, ZOO_LOG_LEVEL_WARN, ZOO_LOG_LEVEL_INFO, ZOO_LOG_LEVEL_DEBUG
  • host_order_deterministic: 初始化zk客户端实例后,该实例是否是按确定顺序去连接ZooKeeper Server集群中的主机,直到连接成功,或者该会话被断开。

常见API:

  • connect():连接ZooKeeper Server
  • a_create (path, data, flags, path_cb): 创建一个znode,并赋值,可以决定这个znode的节点类型(永久、临时、永久有序、临时有序)
  • a_get(path, watch, data_cb): path: 我们想要获取数据的zonde节点路径。 watch: 表示我们是否想要监听该节点后续的数据变更。data_cb(rc ,error, stat, data): rc:return code,0为成功。 error:错误信息。stat:znode的元数据信息。data: znode中的数据。
  • a_set( path, data, version, stat_cb ): 需要注意的是,ZooKeeper并不允许局部写入或读取znode的数据,当设置一个znode节点的数据或读取时,znode节点的内容或被整个替换或全部读取出来。path: 我们想要设置数据的zonde节点路径。data:我们想要设置的数据,一个znode节点可以包含任何数据,数据存储为字节数组(byte array)。字节数组的具体格式特定于每个应用的实现,ZooKeeper不直接提供解析的支持,用户可以使用如Protobuf、Thrift、Avro或MessagePack等序列化协议来处理保存在znode中的数据格式,一般UTF-8编码的字符串就够用了。version:znode的version,从stat中抽取出来的。data_cb(rc, error, stat): 设置数据的回调。
  • close(): 关闭客户端连接
  • a_exists(path, watch, stat_cb): 判断znode是否存在
  • adelete( path, version, voidcb ):删除znode,结尾加上""是为了不和保留字"delete"冲突。。。

实现对指定znode节点数据进行CURD的ORM

'use strict'

const ZooKeeper = require('zookeeper');
const logger = require('../logger/index.js'); // 打日志的工具
const Promise = require('bluebird');
const _ = require('lodash');
let node_env = process.env.NODE_ENV ? process.env.NODE_ENV: 'development';
let connect = node_env === 'development' ? 'zktest.imweb.com:8888' : 'zk.imweb.oa.com:8888';
let timeout = 200000; // 单位毫秒
let path = node_env === 'development' ? '/zk_test/blackList' : '/zk/blackList';
let debug_level = ZooKeeper.ZOO_LOG_LEVEL_WARN;
let host_order_deterministic = false;
let defaultInitOpt = {
    connect,
    timeout,
    debug_level,
    host_order_deterministic
};

class ZK {
    constructor(opt) {
        this.opt = opt;
        this._initZook();
    }

    _initZook() {
        this.zookeeper = new ZooKeeper(this.opt.initOpt || defaultInitOpt);
    }

    /**
     * [get zookeeper blackList]
     * @return {[type]}            [description]
     */
    get() {
        return new Promise((resolve, reject) => {
            let self = this;
            self.zookeeper.connect(function(error) {
                if (error) {
                    reject(error);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(err);
                    } else {
                        logger.info('get zk node: ' + data)
                        resolve(data);                        
                    }
                    process.nextTick(() => {self.zookeeper.close();});
                })
            });
        });
    }

    /**
     * [set zookeeper black_list]
     * @param {object}   opt: 
     * {
     *     380533076: {
     *         "anchor_uin": 380533076,
     *         "expired_time": 1462876279
     *     },
     *     380533077: {
     *         "anchor_uin": 380533077,
     *         "expired_time": 1462876279
     *     },
     * }
     */
    set(opt) {
        let zkData = null;
        let self = this;
        return new Promise((resolve, reject) => {
            self.zookeeper.connect(function(err) {
                if (err) {
                    reject(err);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(error);
                    } else {
                        console.log('get zk node %s', data);
                        console.log('stat: ', stat);
                        console.log('data: ', typeof data);
                        try {
                            zkData = JSON.parse(data);
                        } catch (e) {
                            reject(e);
                            return;
                        }

                        zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
                        _.extend(zkData.data, opt);
                        let currVersion = stat.version;
                        try {
                            zkData = JSON.stringify(zkData);
                        } catch (e) {
                            reject(e);
                            return;
                        }
                        self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
                            if (rc !== 0) {
                                console.log('zk node set result: %d, error: "%s", stat=%s', rc, error, stat);
                                reject(error);
                            } else {
                                logger.info('set zk node succ!');
                                resolve(stat);

                            }
                            process.nextTick(function() {
                                self.zookeeper.close();
                            });
                        })

                    }
                })
            });
        });
    }

    /**
     * [delete zookeeper znode]
     * @param  {array}   keys     [要删除的黑名单的QQ号]
     * @return {[type]}            [description]
     */
    delete(keys) {
        let zkData = null;
        let self = this;
        return new Promise((resolve, reject) => {
            self.zookeeper.connect(function(err) {
                if (err) {
                    reject(err);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(error);
                    } else {
                        console.log('get zk node %s', data);
                        console.log('stat: ', stat);
                        console.log('data: ', typeof data);
                        try {
                            zkData = JSON.parse(data);
                        } catch (e) {
                            reject(e);
                            return;
                        }

                        zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
                        for (let key of keys) {
                            delete zkData.data[key];
                        }

                        let currVersion = stat.version; // 只对这个znode被读取时的这个ersion,否则会抛错。
                        try {
                            zkData = JSON.stringify(zkData);
                        } catch (e) {
                            reject(e);
                            return;
                        }
                        self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
                            if (rc !== 0) {
                                console.log('zk node set result: %d, error: "%s", stat=%s', rc, error, stat);
                                reject(error);
                            } else {
                                logger.info('set zk node succ!');
                                resolve(stat);
                            }
                            process.nextTick(function() {
                                self.zookeeper.close();
                            });
                        })

                    }
                })
            });
        })

    }

    /**
     * [add description]
     * @param {[type]}   opt      [description]
     */
    add(opt) {
        // zookeeper只能以覆盖的方式set
        return this.set(opt);
    }

    clear() {
        let zkData = null;
        let self = this;
        return new Promise((resolve, reject) => {
            self.zookeeper.connect(function(err) {
                if (err) {
                    reject(err);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(error);
                    } else {
                        console.log('stat: ', stat);

                        zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
                        zkData.data = '';
                        let currVersion = stat.version;
                        try {
                            zkData = JSON.stringify(zkData);
                        } catch (e) {
                            reject(e);
                            return;
                        }
                        self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
                            if (rc !== 0) {
                                console.log('zk node clear result: %d, error: "%s", stat=%s', rc, error, stat);
                                reject(error);
                            } else {
                                logger.info('clear zk node succ!');
                                resolve(stat);
                            }
                            process.nextTick(function() {
                                self.zookeeper.close();
                            });
                        })

                    }
                })
            });
        });
    }
}

module.exports = ZK;

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏杨建荣的学习笔记

通过外部表改进一个繁琐的大查询 (r8笔记第32天)

今天处理了一个比较有意思的案例,说是有意思,因为涉及多个部门,但是哪个部门似乎都不愿意接。最后还是用了一些巧力,化干戈为玉帛。 问题的背景是这样的,业务部门需要...

3299
来自专栏张善友的专栏

HAProxy简介

HAProxy是一款提供高可用性、负载均衡以及基于TCP(第四层)和HTTP(第七层)应用的代理软件,HAProxy是完全免费的、借助HAProxy可以快速并且...

24210
来自专栏老码农专栏

原 荐 ActFramework 在 TEB

1162
来自专栏杨建荣的学习笔记

巧用闪回数据库来查看历史数据 (r10笔记第47天)

国庆期间有一个例行维护的任务,需要在大早上7点起来,先根据业务指定的SQL查出指定数据,然后运行一个存储过程来更新数据。 查出来的这部分数据需要...

3124
来自专栏Hadoop实操

如何给Hadoop集群划分角色

Fayson在之前的文章中介绍过《CDH网络要求(Lenovo参考架构)》,《如何为Hadoop集群选择正确的硬件》和《CDH安装前置准备》,而我们在搭建Had...

4867
来自专栏散尽浮华

mysql主从同步(5)-同步延迟状态考量(seconds_behind_master和pt-heartbea)

一般情况下,我们是通过"show slave status \G;"提供的Seconds_Behind_Master值来衡量mysql主从同步的延迟情况。具体说...

2748
来自专栏java架构师

Hadoop学习13--zookeeper相关

zookeeper要保证各个server之间同步,实现同步的协议是zab协议。此协议有两种模式:恢复模式(选主)和广播模式(同步)。 服务启动或者leader崩...

3278
来自专栏Ryan Miao

CentOS(linux)安装PostgreSQL

PostgreSQL是一个功能强大的开源数据库系统。经过长达15年以上的积极开发和不断改进,PostgreSQL已在可靠性、稳定性、数据一致性等获得了业内极高的...

902
来自专栏逸鹏说道

KVM基于内核的虚拟机概念理解与客户机浅析

作为一个KVM的学习者,如果你想要自己完善一个KVM样品级的解决方案,仅仅学会图形化界面使用和简单的配置(详情见上一篇文章)是远远不够的。在上文中感谢@laow...

3644
来自专栏散尽浮华

split-brain 脑裂问题(Keepalived)

脑裂(split-brain) 指在一个高可用(HA)系统中,当联系着的两个节点断开联系时,本来为一个整体的系统,分裂为两个独立节点,这时两个节点开始争抢共享资...

1.2K5

扫码关注云+社区