前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Zookeeper】Leader选举机制示例(异步API)

【Zookeeper】Leader选举机制示例(异步API)

作者头像
王亚昌
发布2018-08-03 15:44:30
7210
发布2018-08-03 15:44:30
举报
文章被收录于专栏:王亚昌的专栏王亚昌的专栏

    上一篇文章中介绍了如何用同步API实现Leader选举机制,本文也借用本一个场景,简单介绍异步API的使用。管理异步API的使用,可以方便大家在一些单进程系统中使用zk。提到异步API的使用,需要先了解zk里的线程模型。

zk的线程模型如下图所示,

调用zookeeper_init后将创建两个线程: I/O线程:该线程主要完成三个任务,一是建立对服务器组的连接以及当连接失效时对服务器组的重连,二是当会话空闲时间超过1/3的超时时间后,将主动向服务器发送PING请求以保持keep alive,三是通过poll来管理FD,读取服务器的响应数据加入到数据队列中。 Completion线程:该线程循环读取数据队列,并调用相应的回调函数。zoo_aget,zoo_acreate等异步函数将直接向服务器端发送请求数据。

从线程模型上看,需要注意,在主线程中使用zk数据时,主要做好和completion线程的数据同步,避免出现读写竞争。

zk的完成函数族如下:

代码语言:javascript
复制
typedef void (*void_completion_t)(int rc, const void *data);

typedef void (*stat_completion_t)(int rc, const struct Stat *stat, const void *data);

typedef void (*data_completion_t)(int rc, const char *value, int value_len, const struct Stat *stat, const void *data);

typedef void (*strings_completion_t)(int rc, const struct String_vector *strings, const void *data);

typedef void (*strings_stat_completion_t)(int rc, const struct String_vector *strings, const struct Stat *stat, const void *data);

typedef void (*string_completion_t)(int rc, const char *value, const void *data);

typedef void (*acl_completion_t)(int rc, struct ACL_vector *acl, struct Stat *stat, const void *data);

本文用到了 data_completion和strings_completion,需要注意的是回调的触发顺序,以zoo_awget_children为例,函数说明如下:

代码语言:javascript
复制
ZOOAPI int zoo_awget_children(zhandle_t *zh, const char *path,
        watcher_fn watcher, void* watcherCtx,
        strings_completion_t completion, const void *data);

当path节点的节点数据变化时,会先触发watcher回调函数,再调用strings_completion函数。

示例代码如下:

代码语言:javascript
复制
#include <zookeeper.h>
#include <zookeeper_log.h>
#include <iostream>
#include <string.h>

using namespace std;

const char* host = "127.0.0.1:2181";
const int timeout = 2000;
zhandle_t* zkhandle = NULL;
bool is_connect = false;

char path[] = "/app_watch";
int ret = 0;
char buffer[1024];
int buff_len;
Stat stat;

void clients_watcher_g(zhandle_t * zh, int type, int state, const char* path, void* watcherCtx) {
    printf("global watcher - type:%d,state:%d\n", type, state);
    if ( type==ZOO_SESSION_EVENT ) {
        if ( state==ZOO_CONNECTED_STATE ) {
            printf("connected to zookeeper service successfully!\n");
            printf("timeout:%d\n", zoo_recv_timeout(zh));
            is_connect = true;
        }
        else if ( state==ZOO_EXPIRED_SESSION_STATE ) {
            printf("zookeeper session expired!\n");
        }
    }
    else {
        printf("other type:%d\n", type);
    }
}

void data_complete(int rc, const char *value, int value_len, const struct Stat *stat, const void *data)
{
	printf("[data_complete] data[%s] \n", data);
}

void string_complete(int rc, const struct String_vector *strings, const void *data)
{
	printf("[string_complete] data[%s] \n", data);
    const struct String_vector & str_vec = *strings;
    for (int i = 0; i < str_vec.count; i++) {
        printf("data[%d] data:[%s]\n", i, str_vec.data[i]);
    }
}

void child_watch_cb(zhandle_t* zh, int type, int state, const char* path, void* watcher) {
    printf("call child watch cb\n");
    ret = zoo_awget_children(zkhandle, path, child_watch_cb, NULL, string_complete, buffer);
    if (ZOK != ret) {
        printf("zookeeper wget error\n");
    }
}

void watch_cb(zhandle_t* zh, int type, int state, const char* path, void* watcher) {
    printf("call watch cb\n");

    // watch path
	buff_len = sizeof(buffer);
	bzero(buffer, buff_len);
    //ret = zoo_wget(zkhandle, path, watch_cb, NULL, buffer, &buff_len, &stat);
    ret = zoo_awget(zkhandle, path, watch_cb, NULL, data_complete, buffer);
	printf("ret=%d stat=%d len=%d buffer=%s\n", ret, stat.ctime, buff_len, buffer);
}

int main(int argc, char* argv[]) 
{
    //Init and connect
    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
    if (zkhandle) 
        zookeeper_close(zkhandle);

    zkhandle = zookeeper_init(host, clients_watcher_g, timeout, 0, NULL, 0);
    if (NULL == zkhandle) {
        printf("zookeeper init error\n");
        return 0;
    }

    // watch path
	buff_len = sizeof(buffer);
	bzero(buffer, buff_len);
    ret = zoo_awget(zkhandle, path, watch_cb, NULL, data_complete, buffer);
	//printf("ret=%d stat=%d len=%d buffer=%s\n", ret, stat.ctime, buff_len, buffer);

    ret = zoo_awget_children(zkhandle, path, child_watch_cb, NULL, string_complete, buffer);
    if (ZOK != ret) {
        printf("zookeeper wget error\n");
        return 0;
    }

    while(1) {
        printf("sleep...\n");
        sleep(5);
    }

    zookeeper_close(zkhandle);
}

执行一下客户端,先启动一个服务器实例,然后再关闭,查看输出如下:

代码语言:javascript
复制
sleep...
global watcher - type:-1,state:3
connected to zookeeper service successfully!
timeout:4000
[data_complete] data[] 
[string_complete] data[] 

call child watch cb //启动了一个服务器实例
[string_complete] data[] 
data[0] data:[0000000038]
sleep...

sleep...
call child watch cb //关闭服务器实例
[string_complete] data[] 
sleep...
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016年03月15日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档