使用Node.js实现一个简单的ZooKeeper客户端

什么是ZooKeeper

Zookeeper 是一个分布式的、开源的协调服务,用在分布式应用程序中。它提出了一组简单的原语,分布式应用程序可以基于这些原语之上构建更高层的分布式服务用于实现同步、配置管理、分组和命名等。Zookeeper 设计的容易进行编程,它使用一种类似于文件系统的目录树结构的数据模型,以 java 方式运行,有 java 和 c 的绑定(binding)。

分布式系统中的协调服务总所周知地难于正确实现,尤其容易产生诸如争用条件 (race conditions)、死锁(deadlock) 等错误。Zookeeper 背后的动机就是减轻分布式应用程序从头做起实现协调服务的难度。

数据模型

Zookeeper 会维护一个具有层次关系的数据结构,它非常类似于一个标准的文件系统,如下图所示:

Zookeeper 这种数据结构有如下这些特点:

  1. 每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1
  2. znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录
  3. znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
  4. znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了
  5. znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2
  6. znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的。

简洁的API Zookeeper 的设计目标之一就是提供简单的编程接口。于是,它只提供了以下的操作:

  • create : 在(命名空间)树的一个特定地址上创建一个节点
  • delete : 删除一个节点
  • exists : 检测在一个地址上是否存在节点
  • get data : 从节点读取数据
  • set data :将数据写入节点
  • get children :检索子节点列表
  • sync : 等待数据传播完成

谁在用?

如小米公司的米聊,其后台就采用了ZooKeeper作为分布式服务的统一协作系统。而阿里公司的开发人员也广泛使用ZooKeeper,并对其进行了适当修改,开源了一款TaoKeeper软件,以适应自身业务需要。另外还包括Apache HBase、Apache Kafka、Facebook Message等产品也都使用了ZooKeeper。

应用场景

  • 数据量比较小,但对数据可靠性要求很高的场景,比如管理分布式应用的协作数据。不能做什么
  • ZooKeeper不合适做海量存储,因为它主要用来管理分布式应用协作的关键数据。对于海量数据,不同的应用有不同的需求,如对一致性和持久性的不同需求,所以在设计应用时,最佳实践应该将应用数据和协作数据分开,况且对于海量数据我们的选择很多,如数据库或者分布式文件系统等。
  • 不要让ZooKeeper Server来管理应用程序的缓存,而应该把这些任务交给ZooKeeper客户端,因为这样会导致ZooKeeper的设计更加复杂。比如,让ZooKeeper来管理缓存失效,可能会导致ZooKeeper在运行时,停滞在等待客户端确认一个缓存失效的请求上,因为在进行所有写操作之前,都需要确认对应的缓存数据是否失效。

Node.js应用与ZooKeeper Server进行通信

那么当Node.js应用作为整个异构分布式系统中的一环,需要作为客户端去操作ZooKeeper Server上的znode时,应该如何实现? 说实话,上文介绍了这么多ZooKeeper的原理,其实作为客户端只需要单纯的把znode作为文件来操作就好,并且可以监听znode的改变,十分方便。本文只描述怎样使用Node.js实现ZooKeeper客户端角色。

node-zookeeper

node-zookeeper是ZooKeeper的一个Node.js客户端实现,这个模块是基于ZooKeeper原生提供的C API来实现的。

下载 npm install zookeeper

栗子

var ZooKeeper = require ("zookeeper");
var zk = new ZooKeeper({
  connect: "localhost:8888" // zk server的服务器地址和监听的端口号,timeout: 200000 // 以毫秒为单位
 ,debug_level: ZooKeeper.ZOO_LOG_LEVEL_WARN 
 ,host_order_deterministic: false 
});
zk.connect(function (err) {
    if(err) throw err;
    console.log ("zk session established, id=%s", zk.client_id);
    zk.a_create ("/node.js1", "some value", ZooKeeper.ZOO_SEQUENCE | ZooKeeper.ZOO_EPHEMERAL, function (rc, error, path)  {
        if (rc != 0) {
            console.log ("zk node create result: %d, error: '%s', path=%s", rc, error, path);
        } else {
            console.log ("created zk node %s", path);
            process.nextTick(function () {
                zk.close ();
            });
        }
    });
});

其中:

  • connect: 包含主机名和ZooKeeper服务器的端口。
  • timeout:以毫秒为单位,表示ZooKeeper等待客户端通信的最长时间,之后会声明会话已死亡。ZooKeeper的会话一般设置超时时间5-10秒。
  • debug_level:设置日志的输出级别,有四种级别:ZOO_LOG_LEVEL_ERROR, ZOO_LOG_LEVEL_WARN, ZOO_LOG_LEVEL_INFO, ZOO_LOG_LEVEL_DEBUG
  • host_order_deterministic: 初始化zk客户端实例后,该实例是否是按确定顺序去连接ZooKeeper Server集群中的主机,直到连接成功,或者该会话被断开。

常见API:

  • connect():连接ZooKeeper Server
  • a_create (path, data, flags, path_cb): 创建一个znode,并赋值,可以决定这个znode的节点类型(永久、临时、永久有序、临时有序)
  • a_get(path, watch, data_cb): path: 我们想要获取数据的zonde节点路径。 watch: 表示我们是否想要监听该节点后续的数据变更。data_cb(rc ,error, stat, data): rc:return code,0为成功。 error:错误信息。stat:znode的元数据信息。data: znode中的数据。
  • a_set( path, data, version, stat_cb ): 需要注意的是,ZooKeeper并不允许局部写入或读取znode的数据,当设置一个znode节点的数据或读取时,znode节点的内容或被整个替换或全部读取出来。path: 我们想要设置数据的zonde节点路径。data:我们想要设置的数据,一个znode节点可以包含任何数据,数据存储为字节数组(byte array)。字节数组的具体格式特定于每个应用的实现,ZooKeeper不直接提供解析的支持,用户可以使用如Protobuf、Thrift、Avro或MessagePack等序列化协议来处理保存在znode中的数据格式,一般UTF-8编码的字符串就够用了。version:znode的version,从stat中抽取出来的。data_cb(rc, error, stat): 设置数据的回调。
  • close(): 关闭客户端连接
  • a_exists(path, watch, stat_cb): 判断znode是否存在
  • adelete( path, version, voidcb ):删除znode,结尾加上""是为了不和保留字"delete"冲突。。。

实现对指定znode节点数据进行CURD的ORM

'use strict'

const ZooKeeper = require('zookeeper');
const logger = require('../logger/index.js'); // 打日志的工具
const Promise = require('bluebird');
const _ = require('lodash');
let node_env = process.env.NODE_ENV ? process.env.NODE_ENV: 'development';
let connect = node_env === 'development' ? 'zktest.imweb.com:8888' : 'zk.imweb.oa.com:8888';
let timeout = 200000; // 单位毫秒
let path = node_env === 'development' ? '/zk_test/blackList' : '/zk/blackList';
let debug_level = ZooKeeper.ZOO_LOG_LEVEL_WARN;
let host_order_deterministic = false;
let defaultInitOpt = {
    connect,
    timeout,
    debug_level,
    host_order_deterministic
};

class ZK {
    constructor(opt) {
        this.opt = opt;
        this._initZook();
    }

    _initZook() {
        this.zookeeper = new ZooKeeper(this.opt.initOpt || defaultInitOpt);
    }

    /**
     * [get zookeeper blackList]
     * @return {[type]}            [description]
     */
    get() {
        return new Promise((resolve, reject) => {
            let self = this;
            self.zookeeper.connect(function(error) {
                if (error) {
                    reject(error);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(err);
                    } else {
                        logger.info('get zk node: ' + data)
                        resolve(data);                        
                    }
                    process.nextTick(() => {self.zookeeper.close();});
                })
            });
        });
    }

    /**
     * [set zookeeper black_list]
     * @param {object}   opt: 
     * {
     *     380533076: {
     *         "anchor_uin": 380533076,
     *         "expired_time": 1462876279
     *     },
     *     380533077: {
     *         "anchor_uin": 380533077,
     *         "expired_time": 1462876279
     *     },
     * }
     */
    set(opt) {
        let zkData = null;
        let self = this;
        return new Promise((resolve, reject) => {
            self.zookeeper.connect(function(err) {
                if (err) {
                    reject(err);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(error);
                    } else {
                        console.log('get zk node %s', data);
                        console.log('stat: ', stat);
                        console.log('data: ', typeof data);
                        try {
                            zkData = JSON.parse(data);
                        } catch (e) {
                            reject(e);
                            return;
                        }

                        zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
                        _.extend(zkData.data, opt);
                        let currVersion = stat.version;
                        try {
                            zkData = JSON.stringify(zkData);
                        } catch (e) {
                            reject(e);
                            return;
                        }
                        self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
                            if (rc !== 0) {
                                console.log('zk node set result: %d, error: "%s", stat=%s', rc, error, stat);
                                reject(error);
                            } else {
                                logger.info('set zk node succ!');
                                resolve(stat);

                            }
                            process.nextTick(function() {
                                self.zookeeper.close();
                            });
                        })

                    }
                })
            });
        });
    }

    /**
     * [delete zookeeper znode]
     * @param  {array}   keys     [要删除的黑名单的QQ号]
     * @return {[type]}            [description]
     */
    delete(keys) {
        let zkData = null;
        let self = this;
        return new Promise((resolve, reject) => {
            self.zookeeper.connect(function(err) {
                if (err) {
                    reject(err);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(error);
                    } else {
                        console.log('get zk node %s', data);
                        console.log('stat: ', stat);
                        console.log('data: ', typeof data);
                        try {
                            zkData = JSON.parse(data);
                        } catch (e) {
                            reject(e);
                            return;
                        }

                        zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
                        for (let key of keys) {
                            delete zkData.data[key];
                        }

                        let currVersion = stat.version; // 只对这个znode被读取时的这个ersion,否则会抛错。
                        try {
                            zkData = JSON.stringify(zkData);
                        } catch (e) {
                            reject(e);
                            return;
                        }
                        self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
                            if (rc !== 0) {
                                console.log('zk node set result: %d, error: "%s", stat=%s', rc, error, stat);
                                reject(error);
                            } else {
                                logger.info('set zk node succ!');
                                resolve(stat);
                            }
                            process.nextTick(function() {
                                self.zookeeper.close();
                            });
                        })

                    }
                })
            });
        })

    }

    /**
     * [add description]
     * @param {[type]}   opt      [description]
     */
    add(opt) {
        // zookeeper只能以覆盖的方式set
        return this.set(opt);
    }

    clear() {
        let zkData = null;
        let self = this;
        return new Promise((resolve, reject) => {
            self.zookeeper.connect(function(err) {
                if (err) {
                    reject(err);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(error);
                    } else {
                        console.log('stat: ', stat);

                        zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
                        zkData.data = '';
                        let currVersion = stat.version;
                        try {
                            zkData = JSON.stringify(zkData);
                        } catch (e) {
                            reject(e);
                            return;
                        }
                        self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
                            if (rc !== 0) {
                                console.log('zk node clear result: %d, error: "%s", stat=%s', rc, error, stat);
                                reject(error);
                            } else {
                                logger.info('clear zk node succ!');
                                resolve(stat);
                            }
                            process.nextTick(function() {
                                self.zookeeper.close();
                            });
                        })

                    }
                })
            });
        });
    }
}

module.exports = ZK;

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏腾讯IVWEB团队的专栏

使用 Node.js 实现一个简单的 ZooKeeper 客户端

Zookeeper 是一个分布式的、开源的协调服务,用在分布式应用程序中。它提出了一组简单的原语,分布式应用程序可以基于这些原语之上构建更高层的分布式服务用于实...

1.3K0
来自专栏IT大咖说

Oracle中最容易被忽略的那些实用特性

内容来源:2017 年 04 月 08 日,ITPUB管理版版主吕海波在“DBGeeK+PG数据库技术沙龙(4月杭州站)”进行《Oracle中最容易被忽略的那些...

1106
来自专栏携程技术中心

开源 | 分布式数据包回溯工具

在复杂的网络环境中,技术人员会面临各种问题或故障需研究并解决,比如可能有系统或应用参数配置不当,也可能恶意软件感染等,都可能对正常应用造成影响。对数据包进行协议...

2676
来自专栏陈树义

1.Redis 的安装

一、Redis 介绍 Redis是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。...

2829
来自专栏恰同学骚年

Hadoop学习笔记—16.Pig框架学习

  Pig是一个基于Hadoop的大规模数据分析平台,它提供的SQL-LIKE语言叫Pig Latin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过...

492
来自专栏美团技术团队

美团点评数据库中间件DBProxy开源

介绍 随着数据量的不断增大,传统的直连数据库对数据进行访问的方式已经无法满足一般公司的需求。通过数据库中间件,可以对数据库进行水平扩展,由原来单台数据库扩展到...

4065
来自专栏北京马哥教育

高可用集群基本概念与heartbeat文本配置接口

一、高可用集群基本概念: 什么是高可用集群: 所谓高可用集群,就是在出现故障时,可以把业务自动转移到其他主机上并让服务正常运行的集群构架 > 高...

3077
来自专栏深度学习之tensorflow实战篇

hive数据:名词解释

问题导读 1.hive数据分为那两种类型? 2.什么表数据? 3.什么是元数据? 4.Hive表里面导入数据的本质什么? 5.表、分区、桶之间之间的关系是什么?...

3557
来自专栏Java帮帮-微信公众号-技术文章全总结

JavaWeb14-事务,连接池(Java正在的全栈开发)

? 事务&连接池 一.事务 1. 事务介绍 什么是事务 事务,一般是指要做的或所做的事情。在计算机术语中是指访问并可能更新数据库中各种数据项的一个程序执行单元...

2504
来自专栏喔家ArchiSelf

老曹眼中的MySQL调优

对于全栈而言,数据库技能不可或缺,关系型数据库或者nosql,内存型数据库或者偏磁盘存储的数据库,对象存储的数据库或者图数据库……林林总总,但是第一必备技能还应...

853

扫码关注云+社区