客户端使用C语言开发,zookeeper提供了两个库,zookeeper_st(单线程库)以及zookeeper_mt(多线程库)。
zookeeper_st提供了异步API和集成在应用程序用来实现事件循环的回调函数,该库是为了支持pthread库不支持或是不稳定的系统而存在。使用过程中需要通过zoo_interest以及zoo_process实现事件处理以及通知机制。
一般情况下使用zookeeper_mt多线程库,多线程库分为三个线程:主线程、io线程以及completion线程。主线程就是调用API的线程,io线程负责网络通信,而对于异步请求以及watcher的响应,io线程会发送给completion线程完成处理。
Zookeeper C API中的各种回调函数原型如下:
监视函数(watcher funciton)原型
typedef void (*watcher_fn)(zhandle_t *zh, int type, int state, const char *path,void *watcherCtx);
监视函数参数
其他回调函数原型
Zookeeper 中还有几种在异步 API(一般以 zoo_a*开头的函数) 中使用的回调函数,根据回调函数处理异步函数返回值类型的不同分为以下几类:
具体如下所示:
// 处理返回 void 类型的回调函数
typedef void(* void_completion_t)(int rc, const void *data);
其中rc是异步返回的错误码,data是传入回调函数的自定义参数(下同)。
// 处理返回 Stat 结构的回调函数
typedef void(* stat_completion_t)(int rc, const struct Stat *stat, const void *data);
其中stat指向与znode相关的Stat信息(下同)。
// 处理返回字符串的回调函数
typedef void(* string_completion_t)(int rc, const char *value, const void *data);
其中value返回的字符串。
// 处理返回数据的回调函数
typedef void(* data_completion_t)(int rc, const char *value, int value_len, const struct Stat *stat, const void *data);
其中value表示返回的字节数组,value_len是字节长度。
// 处理返回字符串列表(a list of string)的回调函数
typedef void(* strings_completion_t)(int rc, const struct String_vector *strings, const void *data);
其中strings指向了某个znode节点的所有子节点名称列表结构(下同)。
// 同时处理返回字符串列表(a list of string)和 Stat 结构的回调函数
typedef void(* strings_stat_completion_t)(int rc, const struct String_vector *strings, const struct Stat *stat, const void *data);
// 处理以及返回 ACL 信息的回调函数
typedef void(* acl_completion_t)(int rc, struct ACL_vector *acl, struct Stat *stat, const void *data);
其中acl指向包含某个节点ACL信息的指针。
ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn,
int recv_timeout,
const clientid_t * clientid,
void *context, int flags);
参数:
ZOOAPI void zoo_set_debug_level(ZooLogLevel logLevel);
ZOOAPI void zoo_set_log_stream(FILE * logStream);
ZOOAPI struct sockaddr *zookeeper_get_connected_host(zhandle_t * zh, struct sockaddr
*addr,
socklen_t * addr_len);
ZOOAPI int zookeeper_interest(zhandle_t * zh, int *fd, int *interest,
struct timeval *tv);
返回向zookeeper中注册的事件列表
参数:
参数:
创建、删除节点
ZOOAPI int zoo_create(zhandle_t * zh, const char *path,
const char *value, int valuelen,
const struct ACL_vector *acl, int flags,
char *path_buffer, int path_buffer_len);
参数:
ZOOAPI int zoo_delete(zhandle_t * zh, const char *path, int version);
参数:
检查节点状态
两者的区别就是后者可以指定单独的watcher_fn(监视器回调函数),前者只能用使用zookeeper_init设置的全局监视器回调函数,下同。
ZOOAPI int zoo_exists(zhandle_t * zh, const char *path, int watch,
struct Stat *stat);
参数(下同):
获取节点数据
ZOOAPI int zoo_get(zhandle_t * zh, const char *path, int watch,
char *buffer, int *buffer_len, struct Stat *stat);
参数:
ZOOAPI int zoo_get_children(zhandle_t * zh, const char *path,
int watch, struct String_vector *strings);
参数(下同):
初学Zookeeper到这里肯定会有疑问,Watcher和AsyncCallBack之间的区别是什么?在介绍异步接口之前先来回顾一下Watcher和AsyncCallBack之间的区别和实现。
Watcher是用于监听节点、session状态的,比如get方法对节点设置了watcher之后,当节点的数据发生了改变之后,服务器会主动发送notification给客户端,然后进行watcher的回调。
AsyncCallBack是以异步的方式调用API,主动向服务器发送请求,然后将请求放入到pending队列中,等待服务器的响应。收到服务器对应的响应后,进行回调。
Zookeeper客户端中Watcher和AsyncCallback都是异步回调的方式,但它们回调的时机是不一样的,前者是由服务器发送事件触发客户端回调,后者是在执行了请求后得到响应后客户端主动触发的。它们的共同点在于都需要在获取了服务器响应之后,由io线程将事件注册到completion线程的事件队列中,然后由completion线程从队列中逐个取出并处理。
创建、删除节点
ZOOAPI int zoo_acreate(zhandle_t * zh, const char *path,
const char *value, int valuelen,
const struct ACL_vector *acl, int flags,
string_completion_t completion, const void *data);
其中参数 string_completion_t completion 即返回字符串的回调函数,那么当 zoo_acreate 调用结束时将会触发 completion 回调函数的调用,同时传递给 completion 的 rc 参数为: ZOK 操作完成;ZNONODE 父节点不存在;ZNODEEXISTS 节点已存在;ZNOAUTH 客户端没有权限创建节点。ZNOCHILDRENFOREPHEMERALS 临时节点不能创建子节点。而 string_completion_t completion 中 const char value 参数即新节点的路径名(注:如果 zoo_acreate 设置了ZOO_SEQUENCE ,则创建节点成功后,节点名称并不是 zoo_acreate 中 path 参数所指定的名称,而是类似与 /xyz0000000001,/xyz0000000002... 的名称)。另外,string_completion_t completion 中 const void data 参数即为 zoo_acreate 中的 const void *data。
参数(下同):
ZOOAPI int zoo_awget(zhandle_t * zh, const char *path,
watcher_fn watcher, void *watcherCtx,
data_completion_t completion, const void *data);
ZOOAPI int zoo_aset(zhandle_t * zh, const char *path,
const char *buffer, int buflen, int version,
stat_completion_t completion, const void *data);
ZOOAPI int zoo_aget_children(zhandle_t * zh, const char *path,
int watch,
strings_completion_t completion,
const void *data);
ZOOAPI int zoo_awget_children(zhandle_t * zh, const char *path,
watcher_fn watcher, void *watcherCtx,
strings_completion_t completion,
const void *data);
ZOOAPI int zoo_aget_children2(zhandle_t * zh, const char *path,
int watch,
strings_stat_completion_t completion,
const void *data);
ZOOAPI int zoo_awget_children2(zhandle_t * zh, const char *path,
watcher_fn watcher, void *watcherCtx,
strings_stat_completion_t completion,
const void *data);
ZOOAPI int zoo_async(zhandle_t * zh, const char *path,
string_completion_t completion, const void *data);
使用get命令获取指定节点数据时,也会返回节点的状态信息Stat,该结构包含如下字段:
错误码 | 说明 |
---|---|
ZOK | 正常返回 |
ZSYSTEMERROR | 系统或服务器端错误(System and server-side errors),服务器不会抛出该错误,该错误也只是用来标识错误范围的,即大于该错误值,且小于 ZAPIERROR 都是系统错误 |
ZRUNTIMEINCONSISTENCY | 运行时非一致性错误 |
ZDATAINCONSISTENCY | 数据非一致性错误 |
ZCONNECTIONLOSS | Zookeeper 客户端与服务器端失去连接 |
ZMARSHALLINGERROR | 在 marshalling 和 unmarshalling 数据时出现错误(Error while marshalling or unmarshalling data) |
ZUNIMPLEMENTED | 该操作未实现(Operation is unimplemented) |
ZOPERATIONTIMEOUT | 该操作超时(Operation timeout) |
ZBADARGUMENTS | 非法参数错误(Invalid arguments) |
ZINVALIDSTATE | 非法句柄状态(Invliad zhandle state) |
ZAPIERROR | API 错误(API errors),服务器不会抛出该错误,该错误也只是用来标识错误范围的,错误值大于该值的标识 API 错误,而小于该值的标识 ZSYSTEMERROR |
ZNONODE | 节点不存在(Node does not exist) |
ZNOAUTH | 没有经过授权(Not authenticated) |
ZBADVERSION | 版本冲突(Version conflict) |
ZNOCHILDRENFOREPHEMERALS | 临时节点不能拥有子节点(Ephemeral nodes may not have children) |
ZNODEEXISTS | 节点已经存在(The node already exists) |
ZNOTEMPTY | 该节点具有自身的子节点(The node has children) |
ZSESSIONEXPIRED | 会话过期(The session has been expired by the server) |
ZINVALIDCALLBACK | 非法的回调函数(Invalid callback specified) |
ZINVALIDACL | 非法的ACL(Invalid ACL specified) |
ZAUTHFAILED | 客户端授权失败(Client authentication failed) |
ZCLOSING | Zookeeper 连接关闭(ZooKeeper is closing) |
ZNOTHING | 并非错误,客户端不需要处理服务器的响应(not error, no server responses to process) |
ZSESSIONMOVED | 会话转移至其他服务器,所以操作被忽略(session moved to another server, so operation is ignored) |
状态码 | 说明 |
---|---|
-112 | 会话超时(ZOO_EXPIRED_SESSION_STATE) |
-113 | 认证失败(ZOO_AUTH_FAILED_STATE) |
1 | 连接建立中(ZOO_CONNECTING_STATE) |
2 | 连接建立中(ZOO_ASSOCIATING_STATE) |
3 | 连接已建立(ZOO_CONNECTED_STATE) |
999 | 无连接状态 |
事件码 | 说明 |
---|---|
1 | 创建节点事件(ZOO_CREATED_EVENT) |
2 | 删除节点事件(ZOO_DELETED_EVENT) |
3 | 更改节点事件(ZOO_CHANGED_EVENT) |
4 | 子节点列表变化事件(ZOO_CHILD_EVENT) |
-1 | 会话session事件(ZOO_SESSION_EVENT) |
-2 | 监视被移除事件(ZOO_NOTWATCHING_EVENT) |
Watcher的设置和获取在开发中很常见,不同的操作会收到不同的watcher信息。对父节点的变更以及孙节点的变更都不会触发watcher,而对watcher本身节点以及子节点的变更会触发watcher。
exists、getdata以及getchildren方法获取watcher可参考如下:
操作 | 方法 | 触发watcher | watcher state | watcher type | watcher path |
---|---|---|---|---|---|
Create当前节点 | getdata | × | × | × | × |
| getchildren | √ | 3 | 4 | √ |
| exists | × | × | × | × |
set当前节点 | getdata | √ | 3 | 3 | √ |
| getchildren | × | × | × | × |
| exists | √ | 3 | 3 | √ |
delete当前节点 | getdata | √ | 3 | 2 | √ |
| getchildren | √ | 3 | 2 | √ |
| exists | √ | 3 | 2 | √ |
create子节点 | getdata | × | × | × | × |
| getchildren | √ | 3 | 4 | √ |
| exists | × | × | × | × |
set子节点 | getdata | × | × | × | × |
| getchildren | × | × | × | × |
| exists | × | × | × | × |
delete子节点 | getdata | × | × | × | × |
| getchildren | √ | 3 | 4 | √ |
| exists | × | × | × | × |
恢复连接 | getdata | √ | 1 | -1 | × |
| getchildren | √ | 1 | -1 | × |
| exists | √ | 1 | -1 | × |
恢复连接session未超时 | getdata | √ | -112 | -1 | × |
| getchildren | √ | -112 | -1 | × |
| exists | √ | -112 | -1 | × |
恢复连接session超时 | getdata | √ | 3 | -1 | × |
| getchildren | √ | 3 | -1 | × |
| exists | √ | 3 | -1 | × |
int main(int argc, const char *argv[]) {
/* 设置zookeeper host,其中host字符串格式为逗号隔开的IP:PORT对 */
static const char* host = "x.x.x.x:x";
/* 设置客户端连接服务器超时时间,单位ms */
int timeout = 30000;
/* 设置日志为调试级别 */
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
/* 用于启用或停用quarum端点的随机化排序,通常仅在测试时使用。
如果非0,使得client连接到quarum端按照被初始化的顺序。
如果是0,zookeeper_init将变更端点顺序,使得client连接分布在更优的端点上。*/
zoo_deterministic_conn_order(1);
/* 初始化zookeeper句柄,传入全局监视器回调函数、超时、以及上下文信息 */
zhandle_t* zkhandle = zookeeper_init(host, zk_watcher_g, 30000, 0, (void *)"hello world", 0);
if (zkhandle == NULL) {
fprintf(stderr, "Error when connecting to zookeeper servers...\n");
exit(EXIT_FAILURE);
}
/* 等待异步回调 */
int fd, interest, events;
int rc;
fd_set rfds, wfds, efds;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);
while (1) {
struct timeval tv;
/* 注册zookeeper中的事件 */
zookeeper_interest(zkhandle, &fd, &interest, &tv);
if (fd != -1) {
if (interest & ZOOKEEPER_READ) {
FD_SET(fd, &rfds);
} else {
FD_CLR(fd, &rfds);
}
if (interest & ZOOKEEPER_WRITE) {
FD_SET(fd, &wfds);
} else {
FD_CLR(fd, &wfds);
}
} else {
fd = 0;
}
if (select(fd + 1, &rfds, &wfds, &efds, &tv) < 0) {
printf("[%s %d]select failed, err=%d, msg=%s\n", __FUNCTION__, __LINE__,
errno, strerror(errno));
}
events = 0;
if (FD_ISSET(fd, &rfds)) {
events |= ZOOKEEPER_READ;
}
if (FD_ISSET(fd, &wfds)) {
events |= ZOOKEEPER_WRITE;
}
/* 通知zookeeper客户端事件已经发生 */
zookeeper_process(zkhandle, events);
}
zookeeper_close(zkhandle);
}
void zk_watcher_g(zhandle_t *zh, int type, int state, const char* path, void *watcherCtx) {
int ret;
const clientid_t *zk_clientid;
/* 声明节点路径 */
if(path == NULL || strlen(path) == 0) {
path = "/xyz";
}
/* 当前event type是会话 */
if(type == ZOO_SESSION_EVENT) {
if(state == ZOO_EXPIRED_SESSION_STATE) {
printf("[%s %d] zookeeper session expired\n", __FUNCTION__, __LINE__);
}
/* 会话状态是已连接 */
else if(state == ZOO_CONNECTED_STATE) {
/* 获取客户端的 session id,只有在客户端的当前连接状态有效时才可以 */
zk_clientid = zoo_client_id(zh);
/* 调用异步api */
ret = zoo_aexists(zh, path, TRUE, zk_stat_completion, path);
ret = zoo_aget(zh, path, TRUE, zk_data_completion, "get param");
ret = zoo_aget_children2(zh, path, TRUE, zk_strings_stat_completion, "get children param");
}
}
/* 当前event type是节点创建事件 */
else if (type == ZOO_CREATED_EVENT) {
...
}
/* 当前event type是节点删除事件 */
else if (type == ZOO_DELETED_EVENT) {
...
}
/* 当前event type是节点数据改变事件 */
else if (type == ZOO_CHANGED_EVENT) {
...
}
/* 当前event type是子节点事件 */
else if (type == ZOO_CHILD_EVENT) {
...
}
}
单线程程序链接需要libzookeeper_st库
Something happened.
type: SESSION_EVENT
state: 4261760s
path:
watcherCtx: hello world
[zk_watcher_g 140] connected to zookeeper server with clientid=96408539334299438
in stat completion rc = 0 data = /xyz stat:
tctime = Sun Nov 20 10:12:08 2016
| tczxid = 609cd7384
tmtime = Sun Nov 20 10:12:08 2016
| tmzxid = 609cd7384
version = 0in data completion []: rc = 0
tctime = Sun Nov 20 10:12:08 2016
| tczxid = 609cd7384
tmtime = Sun Nov 20 10:12:08 2016
| tmzxid = 609cd7384
version = 0in strings stat completion [get children param]: rc = 0, string count 0
tctime = Sun Nov 20 10:12:08 2016
| tczxid = 609cd7384
tmtime = Sun Nov 20 10:12:08 2016
| tmzxid = 609cd7384
在程序执行前首先建立/xyz节点,服务器首次返回的事件类型为SESSION_EVENT,显示客户端连接已成功。
version = 0Something happened.
type: CHILD_EVENT
state: 4261760s
path: /xyz
watcherCtx: hello world
path (null) child event
in data completion []: rc = 0
tctime = Sun Nov 20 10:12:08 2016
| tczxid = 609cd7384
tmtime = Sun Nov 20 10:12:08 2016
| tmzxid = 609cd7384
version = 0in strings stat completion [get children param]: rc = 0, string count 1
0: abc
tctime = Sun Nov 20 10:12:08 2016
| tczxid = 609cd7384
tmtime = Sun Nov 20 10:12:08 2016
| tmzxid = 609cd7384
创建节点/xyz/abc,服务器返回的事件类型是CHILD_EVENT,表示的是子节点事件,同时返回子节点列表。
version = 0Something happened.
type: CHANGED_EVENT
state: 4261760s
path: /xyz
watcherCtx: hello world
path /xyz changed
in strings stat completion [get children param]: rc = 0, string count 1
0: abc
tctime = Sun Nov 20 10:12:08 2016
| tczxid = 609cd7384
tmtime = Sun Nov 20 10:16:56 2016
| tmzxid = 609cd7449
version = 1in data completion []: rc = 0
tctime = Sun Nov 20 10:12:08 2016
| tczxid = 609cd7384
tmtime = Sun Nov 20 10:16:56 2016
| tmzxid = 609cd7449
设置节点数据/xyz,服务器返回的事件类型是CHANGED_EVENT,表示的节点数据改变事件,
多线程库api使用相对单线程就比较简单了
int main(int argc, const char *argv[]) {
if (argc != 2) {
printf("usage: zookeeper_m_test server_address\n");
exit(-1);
}
const char* host = argv[1];
int timeout = 30000;
char buffer[512];
int *bufferlen;
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
/* 初始化zookeeper句柄,传入全局监视器回调函数、超时、以及简单参数 */
zhandle_t* zkhandle = zookeeper_init(host, zookeeper_watcher_g, timeout, 0,
NULL, 0);
if (zkhandle == NULL) {
fprintf(stderr, "Error when connecting to zookeeper servers...\n");
exit(EXIT_FAILURE);
}
/* 调用api */
create(zkhandle, "/abc");
exists(zkhandle, "/abc");
getACL(zkhandle, "/abc");
del(zkhandle, "/abc");
while (1)
;
zookeeper_close(zkhandle);
}
和上面单线程类似
多线程程序需要链接libzookeeper_mt以及pthread两个库
Something happened.
type: -1
state: 3
path:
watcherCtx: hello zookeeper.
[acreate]: rc = -110
aexists: rc = 0 Stat:
ctime = Sun Nov 20 00:25:19 2016
czxid=609ccd4de
mtime=Sun Nov 20 00:26:04 2016
mzxid=609ccd4e9
version=1 aversion=0
ephemeralOwner = 0
...
Something happened.
type: 3
state: 3
path: /xyz
watcherCtx: hello zookeeper.
...
[adelete]: rc = 0
这里可以看到创建session成功的时候,服务器返回状态码state=3,事件码type=-1。
而改变/xyz节点数据的时候,服务器返回状态码state=3,事件码type=3(由于这里设置了watcher,因此当节点改变的时候会通知一次客户端)。
最后成功删除节点/xyz。
http://blog.jobbole.com/104833/
http://blog.csdn.net/wudongxu/article/details/9198963
http://www.cnblogs.com/haippy/archive/2013/02/21/2920426.html
http://www.cnblogs.com/haippy/archive/2013/02/24/2924567.html
http://www.cnblogs.com/haippy/archive/2013/02/21/2920426.html
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。