使用 Node.js 实现一个简单的 ZooKeeper 客户端

作者:link

什么是ZooKeeper

Zookeeper 是一个分布式的、开源的协调服务,用在分布式应用程序中。它提出了一组简单的原语,分布式应用程序可以基于这些原语之上构建更高层的分布式服务用于实现同步、配置管理、分组和命名等。Zookeeper 设计的容易进行编程,它使用一种类似于文件系统的目录树结构的数据模型,以 java 方式运行,有 java 和 c 的绑定(binding)。

分布式系统中的协调服务总所周知地难于正确实现,尤其容易产生诸如争用条件 (race conditions)、死锁(deadlock) 等错误。Zookeeper 背后的动机就是减轻分布式应用程序从头做起实现协调服务的难度。

数据模型

Zookeeper 会维护一个具有层次关系的数据结构,它非常类似于一个标准的文件系统,如下图所示:

Zookeeper 这种数据结构有如下这些特点:

  1. 每个子目录项如 NameService 都被称作为 znode,这个 znode 是被它所在的路径唯一标识,如 Server1 这个 znode 的标识为 /NameService/Server1
  2. znode 可以有子节点目录,并且每个 znode 可以存储数据,注意 EPHEMERAL 类型的目录节点不能有子节点目录
  3. znode 是有版本的,每个 znode 中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
  4. znode 可以是临时节点,一旦创建这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除,Zookeeper 的客户端和服务器通信采用长连接方式,每个客户端和服务器通过心跳来保持连接,这个连接状态称为 session,如果 znode 是临时节点,这个 session 失效,znode 也就删除了
  5. znode 的目录名可以自动编号,如 App1 已经存在,再创建的话,将会自动命名为 App2
  6. znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于这个特性实现的。

简洁的API

Zookeeper 的设计目标之一就是提供简单的编程接口。于是,它只提供了以下的操作:

  • create : 在(命名空间)树的一个特定地址上创建一个节点
  • delete : 删除一个节点
  • exists : 检测在一个地址上是否存在节点
  • get data : 从节点读取数据
  • set data :将数据写入节点
  • get children :检索子节点列表
  • sync : 等待数据传播完成

谁在用?

如小米公司的米聊,其后台就采用了ZooKeeper作为分布式服务的统一协作系统。而阿里公司的开发人员也广泛使用ZooKeeper,并对其进行了适当修改,开源了一款TaoKeeper软件,以适应自身业务需要。另外还包括Apache HBase、Apache Kafka、Facebook Message等产品也都使用了ZooKeeper。

应用场景

  • 数据量比较小,但对数据可靠性要求很高的场景,比如管理分布式应用的协作数据。
  • 不能做什么
  • ZooKeeper不合适做海量存储,因为它主要用来管理分布式应用协作的关键数据。对于海量数据,不同的应用有不同的需求,如对一致性和持久性的不同需求,所以在设计应用时,最佳实践应该将应用数据和协作数据分开,况且对于海量数据我们的选择很多,如数据库或者分布式文件系统等。
  • 不要让ZooKeeper Server来管理应用程序的缓存,而应该把这些任务交给ZooKeeper客户端,因为这样会导致ZooKeeper的设计更加复杂。比如,让ZooKeeper来管理缓存失效,可能会导致ZooKeeper在运行时,停滞在等待客户端确认一个缓存失效的请求上,因为在进行所有写操作之前,都需要确认对应的缓存数据是否失效。

Node.js应用与ZooKeeper Server进行通信

那么当Node.js应用作为整个异构分布式系统中的一环,需要作为客户端去操作ZooKeeper Server上的znode时,应该如何实现?

说实话,上文介绍了这么多ZooKeeper的原理,其实作为客户端只需要单纯的把znode作为文件来操作就好,并且可以监听znode的改变,十分方便。本文只描述怎样使用Node.js实现ZooKeeper客户端角色。

node-zookeeper

node-zookeeper是ZooKeeper的一个Node.js客户端实现,这个模块是基于ZooKeeper原生提供的C API来实现的。

下载 npm install zookeeper

栗子

var ZooKeeper = require ("zookeeper");
var zk = new ZooKeeper({
  connect: "localhost:8888" // zk server的服务器地址和监听的端口号
 ,timeout: 200000 // 以毫秒为单位
 ,debug_level: ZooKeeper.ZOO_LOG_LEVEL_WARN 
 ,host_order_deterministic: false 
});
zk.connect(function (err) {
    if(err) throw err;
    console.log ("zk session established, id=%s", zk.client_id);
    zk.a_create ("/node.js1", "some value", ZooKeeper.ZOO_SEQUENCE | ZooKeeper.ZOO_EPHEMERAL, function (rc, error, path)  {
        if (rc != 0) {
            console.log ("zk node create result: %d, error: '%s', path=%s", rc, error, path);
        } else {
            console.log ("created zk node %s", path);
            process.nextTick(function () {
                zk.close ();
            });
        }
    });
});

其中:

  • connect: 包含主机名和ZooKeeper服务器的端口。
  • timeout:以毫秒为单位,表示ZooKeeper等待客户端通信的最长时间,之后会声明会话已死亡。ZooKeeper的会话一般设置超时时间5-10秒。
  • debug_level:设置日志的输出级别,有四种级别:ZOO_LOG_LEVEL_ERROR, ZOO_LOG_LEVEL_WARN, ZOO_LOG_LEVEL_INFO, ZOO_LOG_LEVEL_DEBUG
  • host_order_deterministic: 初始化zk客户端实例后,该实例是否是按确定顺序去连接ZooKeeper Server集群中的主机,直到连接成功,或者该会话被断开。

常见API:

  • connect():连接ZooKeeper Server
  • a_create (path, data, flags, path_cb): 创建一个znode,并赋值,可以决定这个znode的节点类型(永久、临时、永久有序、临时有序)
  • a_get(path, watch, data_cb): path: 我们想要获取数据的zonde节点路径。 watch: 表示我们是否想要监听该节点后续的数据变更。data_cb(rc ,error, stat, data): rc:return code,0为成功。 error:错误信息。stat:znode的元数据信息。data: znode中的数据。
  • a_set( path, data, version, stat_cb ): 需要注意的是,ZooKeeper并不允许局部写入或读取znode的数据,当设置一个znode节点的数据或读取时,znode节点的内容或被整个替换或全部读取出来。path: 我们想要设置数据的zonde节点路径。data:我们想要设置的数据,一个znode节点可以包含任何数据,数据存储为字节数组(byte array)。字节数组的具体格式特定于每个应用的实现,ZooKeeper不直接提供解析的支持,用户可以使用如Protobuf、Thrift、Avro或MessagePack等序列化协议来处理保存在znode中的数据格式,一般UTF-8编码的字符串就够用了。version:znode的version,从stat中抽取出来的。data_cb(rc, error, stat): 设置数据的回调。
  • close(): 关闭客户端连接。
  • a_exists(path, watch, stat_cb): 判断znode是否存在。
  • adelete( path, version, voidcb ):删除znode,结尾加上""是为了不和保留字"delete"冲突。。。

实现对指定znode节点数据进行CURD的ORM

'use strict'

const ZooKeeper = require('zookeeper');
const logger = require('../logger/index.js'); // 打日志的工具
const Promise = require('bluebird');
const _ = require('lodash');
let node_env = process.env.NODE_ENV ? process.env.NODE_ENV: 'development';
let connect = node_env === 'development' ? 'zktest.imweb.com:8888' : 'zk.imweb.oa.com:8888';
let timeout = 200000; // 单位毫秒
let path = node_env === 'development' ? '/zk_test/blackList' : '/zk/blackList';
let debug_level = ZooKeeper.ZOO_LOG_LEVEL_WARN;
let host_order_deterministic = false;
let defaultInitOpt = {
    connect,
    timeout,
    debug_level,
    host_order_deterministic
};

class ZK {
    constructor(opt) {
        this.opt = opt;
        this._initZook();
    }

    _initZook() {
        this.zookeeper = new ZooKeeper(this.opt.initOpt || defaultInitOpt);
    }

    /**
     * [get zookeeper blackList]
     * @return {[type]}            [description]
     */
    get() {
        return new Promise((resolve, reject) => {
            let self = this;
            self.zookeeper.connect(function(error) {
                if (error) {
                    reject(error);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(err);
                    } else {
                        logger.info('get zk node: ' + data)
                        resolve(data);                        
                    }
                    process.nextTick(() => {self.zookeeper.close();});
                })
            });
        });
    }

    /**
     * [set zookeeper black_list]
     * @param {object}   opt: 
     * {
     *     380533076: {
     *         "anchor_uin": 380533076,
     *         "expired_time": 1462876279
     *     },
     *     380533077: {
     *         "anchor_uin": 380533077,
     *         "expired_time": 1462876279
     *     },
     * }
     */
    set(opt) {
        let zkData = null;
        let self = this;
        return new Promise((resolve, reject) => {
            self.zookeeper.connect(function(err) {
                if (err) {
                    reject(err);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(error);
                    } else {
                        console.log('get zk node %s', data);
                        console.log('stat: ', stat);
                        console.log('data: ', typeof data);
                        try {
                            zkData = JSON.parse(data);
                        } catch (e) {
                            reject(e);
                            return;
                        }

                        zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
                        _.extend(zkData.data, opt);
                        let currVersion = stat.version;
                        try {
                            zkData = JSON.stringify(zkData);
                        } catch (e) {
                            reject(e);
                            return;
                        }
                        self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
                            if (rc !== 0) {
                                console.log('zk node set result: %d, error: "%s", stat=%s', rc, error, stat);
                                reject(error);
                            } else {
                                logger.info('set zk node succ!');
                                resolve(stat);

                            }
                            process.nextTick(function() {
                                self.zookeeper.close();
                            });
                        })

                    }
                })
            });
        });
    }

    /**
     * [delete zookeeper znode]
     * @param  {array}   keys     [要删除的黑名单的QQ号]
     * @return {[type]}            [description]
     */
    delete(keys) {
        let zkData = null;
        let self = this;
        return new Promise((resolve, reject) => {
            self.zookeeper.connect(function(err) {
                if (err) {
                    reject(err);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(error);
                    } else {
                        console.log('get zk node %s', data);
                        console.log('stat: ', stat);
                        console.log('data: ', typeof data);
                        try {
                            zkData = JSON.parse(data);
                        } catch (e) {
                            reject(e);
                            return;
                        }

                        zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
                        for (let key of keys) {
                            delete zkData.data[key];
                        }

                        let currVersion = stat.version; // 只对这个znode被读取时的这个ersion,否则会抛错。
                        try {
                            zkData = JSON.stringify(zkData);
                        } catch (e) {
                            reject(e);
                            return;
                        }
                        self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
                            if (rc !== 0) {
                                console.log('zk node set result: %d, error: "%s", stat=%s', rc, error, stat);
                                reject(error);
                            } else {
                                logger.info('set zk node succ!');
                                resolve(stat);
                            }
                            process.nextTick(function() {
                                self.zookeeper.close();
                            });
                        })

                    }
                })
            });
        })

    }

    /**
     * [add description]
     * @param {[type]}   opt      [description]
     */
    add(opt) {
        // zookeeper只能以覆盖的方式set
        return this.set(opt);
    }

    clear() {
        let zkData = null;
        let self = this;
        return new Promise((resolve, reject) => {
            self.zookeeper.connect(function(err) {
                if (err) {
                    reject(err);
                    return;
                }
                console.log('zk session established, id=%s', self.zookeeper.client_id);

                self.zookeeper.a_get(path, null, function(rc, error, stat, data) {
                    if (rc !== 0) {
                        console.log('zk node get result: %d, error: "%s", stat=%s, data=%s', rc, error, stat, data);
                        reject(error);
                    } else {
                        console.log('stat: ', stat);

                        zkData.last_update_time = parseInt(new Date().getTime() / 1000, 10);
                        zkData.data = '';
                        let currVersion = stat.version;
                        try {
                            zkData = JSON.stringify(zkData);
                        } catch (e) {
                            reject(e);
                            return;
                        }
                        self.zookeeper.a_set(path, zkData, currVersion, function(rc, error, stat) {
                            if (rc !== 0) {
                                console.log('zk node clear result: %d, error: "%s", stat=%s', rc, error, stat);
                                reject(error);
                            } else {
                                logger.info('clear zk node succ!');
                                resolve(stat);
                            }
                            process.nextTick(function() {
                                self.zookeeper.close();
                            });
                        })

                    }
                })
            });
        });
    }
}

module.exports = ZK;

原文链接:http://ivweb.io/topic/579db07093d9938132cc8d85

相关推荐 通过ffi在Node.js中调用动态链接库(.so/.dll文件) 一次 Node.js 内存溢出

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏数据和云

性能优化:Linux环境下合理配置大内存页

熊军(老熊) 云和恩墨西区总经理 Oracle ACED,ACOUG核心会员 PC Server发展到今天,在性能方面有着长足的进步。64位的CPU在数年前都已...

4205
来自专栏李鹏的专栏

JAVA 高并发设计

同步和异步通常用来形容一次方法调用,同步方法,调用者必须等到方法调用返回后,才能继续后续的行为,异步方法调用会立即返回,调用者就可以继续后续的操作。

3520
来自专栏田京昆的专栏

基于hashicorp/raft的分布式一致性实战教学

hashicorp/raft是raft算法的一种比较流行的golang实现,基于它能够比较方便的构建具有强一致性的分布式系统。本文通过实现一个简单的分布式缓存系...

1.5K14
来自专栏吴伟祥

springmvc+maven+netty-socketio服务端构建实时通信

WebSocket是HTML5的一种新通信协议,它实现了浏览器与服务器之间的双向通讯。而Socket.IO是一个完全由JavaScript实现、基于Node.j...

872
来自专栏xingoo, 一个梦想做发明家的程序员

跟着ZooKeeper学Java——CountDownLatch和Join的使用

在阅读ZooKeeper的源码时,看到这么一个片段,在单机模式启动的时候,会调用下面的方法,根据zoo.cfg的配置启动单机版本的服务器: public voi...

3214
来自专栏hoop

基于hashicorp/raft的分布式一致性实战教学

对于后台开发来说,随着业务的发展,由于访问量增大的压力和数据容灾的需要,一定会需要使用分布式的系统,而分布式势必会引入一致性的问题。

1741
来自专栏容器云生态

redis超时原因系统性排查

1.计算延迟时间: 使用–latency参数  以下参数表示平均超时时间0.03ms。 redis-cli --latency -h 127.0.0.1 ...

7406
来自专栏Java技术

Redis简介以及和其他缓存数数据库的区别

Redis 是一个开源的内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。

722
来自专栏张善友的专栏

WS-Eventing、WS-Transfer Web服务标准

传输(Transfer)     WS-Transfer详细说明了对通过Web服务进行访问的数据实体进行管理所需的基本操作。要了解WS-Transfer需要介绍...

17410
来自专栏阿杜的世界

【译】Linux概念架构的理解摘要一、Linux内核在整个计算机系统中的位置二、内核的作用三、Linux内核的整体架构四、高度模块化设计的系统,利于分工合作。五、系统中的数据结构六、子系统架构七、结论

声明:本文翻译自Conceptual Architecture of the Linux Kernel

694

扫码关注云+社区