前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >zookeeper快速入门——应用(两种分布式锁)

zookeeper快速入门——应用(两种分布式锁)

作者头像
方亮
发布2019-01-16 15:57:03
7040
发布2019-01-16 15:57:03
举报
文章被收录于专栏:方亮方亮方亮

        在《zookeeper快速入门——简介》一文中,我们介绍了zookeeper的机制。但是还是比较抽象,没有直观感受到它在分布式系统中的应用。本文我们使用一个例子,三次迭代演进,来说明Zookeeper Client端和Server端如何配合以实现分布式协作。(转载请指明出于breaksoftware的csdn博客)

        为了例子足够简单明确,我们以实现“分布式锁”为例。所谓分布式锁,就是在一个分布式系统中,各个子系统可以共享的同一把“锁”。这样大家可以在这把锁的协调下,进行协作。

        我们可以尝试在Zookeeper Server的节点树上创建一个特定名称的节点。如果创建成功了,则认为获取到了锁。Client可以执行相应业务逻辑,然后通知Server删除该节点以释放锁。其他Client可能在此时正好去创建该节点,并成功了,那么它就获得了锁。其他创建失败的Client则被认为没有获得锁,则继续等待和尝试。

        可能此时你已经意识到一个问题:如果某个获得锁的Client和Server断开了连接,而没有机会通知Server删除test_lock文件。那就导致整个系统处于“死锁”状态。

        不用担心,zookeeper设计了“临时”节点的概念。“临时”节点由Client向Server端请求创建,一旦Client和Server连接断开,这个Client创建的“临时”节点将被删除。这样我们就不用担心因为连接断开而导致的问题了。和普通节点一样,“临时”节点也可以被Client主动删除。

        基本思路理清楚后,我们开始着手编写这块逻辑。为了简单,我们在一个进程内部使用多线程技术模拟分布在不同机器上的Client端。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <zookeeper.h>
#include <zookeeper_log.h>

        第一步我们要使用zookeeper_init方法去连接Zookeeper Server。该函数原型如下

ZOOAPI zhandle_t* zookeeper_init (	
	const char * 	host,
	watcher_fn 	watcher,
	int 	recv_timeout,
	const clientid_t * 	clientid,
	void * 	context,
	int 	flags	 
)	

        该方法创建了一个zhandle_t指针和一个与之绑定的连接session,之后我们将在一直要使用这个指针和Server进行通信。

     但是这个函数有个陷阱:即使返回了一个可用指针,可是与之绑定的session此时不一定可用。我们需要等到ZOO_CONNECTED_STATE消息到来才能确认。此时我们就要借助zookeeper中无处不在的监视功能(watcher)。

         zookeeper_init方法第二个参数传递的是一个回调函数地址——watcher,第五个参数传递的是这个回调函数可以使用的上下文信息——context。

        为了让回调函数可以通知工作线程session已经可用,我们可以把上下文信息设置为一个包含条件变量的结构watchctx_t

typedef struct watchctx_t {
    pthread_cond_t cond;
    pthread_mutex_t cond_lock;
} watchctx_t;

        这样在回调函数中,如果我们收到ZOO_CONNECTED_STATE通知,就触发条件变量

void main_watcher(zhandle_t* zh, int type, int state,
        const char* path, void* watcherCtx)
{
    if (type == ZOO_SESSION_EVENT) {
        watchctx_t *ctx = (watchctx_t*)watcherCtx;
        if (state == ZOO_CONNECTED_STATE) {
            pthread_cond_signal(&ctx->cond);
        }
    }
}

        在调用zookeeper_init方法后,工作线程一直等待条件变量,如果超过设置的超时时间,就认为连接失败

int init_watchctx(watchctx_t* ctx) {
    if (0 != pthread_cond_init(&ctx->cond, NULL)) {
        fprintf(stderr, "condition init error\n");
        return -1;
    }

    if (0 != pthread_mutex_init(&ctx->cond_lock, NULL)) {
        fprintf(stderr, "mutex init error\n");
        pthread_cond_destroy(&ctx->cond);
        return -2;
    }

    return 0;
}
zhandle_t* init() {
    const char* host = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    int timeout = 30000;
    zhandle_t* zh = NULL;

    watchctx_t ctx;
    if (0 != init_watchctx(&ctx)) {
        return zh;
    }

    zh = zookeeper_init(host, main_watcher, timeout, 0, &ctx, 0);
    if (zh == NULL) {
        fprintf(stderr, "Error when connecting to zookeeper servers...\n");
        pthread_cond_destroy(&ctx.cond);
        pthread_mutex_destroy(&ctx.cond_lock);
        return zh;
    }

    struct timeval now;  
    struct timespec outtime;     
    gettimeofday(&now, NULL);  
    outtime.tv_sec = now.tv_sec + 1;  
    outtime.tv_nsec = now.tv_usec * 1000;

    pthread_mutex_lock(&ctx.cond_lock);
    int wait_result = pthread_cond_timedwait(&ctx.cond, &ctx.cond_lock, &outtime);
    pthread_mutex_unlock(&ctx.cond_lock);
    
    pthread_cond_destroy(&ctx.cond);
    pthread_mutex_destroy(&ctx.cond_lock);

    if (0 != wait_result) {
        fprintf(stderr, "Connecting to zookeeper servers timeout...\n");
        zookeeper_close(zh);
        zh = NULL;
        return zh;
    }

    return zh;
}

        解决了连接问题,后面的逻辑就简单了。我们使用zoo_create方法创建一个路径为/test_lock的临时节点,然后通过返回结果判断是否获得锁

void thread_routine(void* ptr) {

    zhandle_t* zh = init();
    if (!zh) {
        return;
    }

    const char* lock_data = "lock";
    const char* lock_path =  "/test_lock";
    int ret = ZNODEEXISTS;
    do {
        ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
            &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
        if (ZNODEEXISTS == ret) {
            //fprintf(stderr, "lock exist\n");
            continue;
        }
        else if (ZOK == ret) {
            pthread_t pid = pthread_self();
            fprintf(stdout, "%lu get lock\n", (long long)pid);
            zoo_delete(zh, lock_path, -1);
            sleep(1);
        }   
        else {
            fprintf(stderr, "Error %d for %s\n", ret, "create");
            break;
        }
    } while (1);

    zookeeper_close(zh);
}

        上述代码19行开始的逻辑表示这个线程获取了锁,它只是简单的打印出get lock,然后调用zoo_delete删除节点——释放锁。

        这个函数使用一个while死循环来控制业务进行,这种不停调用zoo_create去检测是否获得锁的方法非常浪费资源。那我们如何对这个函数进行改造?

        如果我们可以基于事件驱动监控/test_lock节点状态就好了。zookeeper也提供了这种方式——还是watcher。

void thread_routine(void* ptr) {

    zhandle_t* zh = init();
    if (!zh) {
        return;
    }

    watchctx_t ctx;
    if (0 != init_watchctx(&ctx)) {
        return;
    }

    const char* lock_data = "lock";
    const char* lock_path = "/test_lock";
    int ret = ZNODEEXISTS;
    do {
        struct Stat stat;
        int cur_st = zoo_wexists(zh, lock_path, lock_watcher, &ctx, &stat);
        if (ZOK == cur_st) {
            //fprintf(stdout, "wait\n");
            pthread_mutex_lock(&ctx.cond_lock);
            pthread_cond_wait(&ctx.cond, &ctx.cond_lock);
            pthread_mutex_unlock(&ctx.cond_lock);
        }

        ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
            &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
        if (ZNODEEXISTS == ret) {
            //fprintf(stderr, "lock exist\n");
        }
        else if (ZOK == ret) {
            pthread_t pid = pthread_self();
            fprintf(stdout, "%lu get lock\n", (long long)pid);
            zoo_delete(zh, lock_path, -1);
        }   
        else {
            fprintf(stderr, "Error %d for %s\n", ret, "create");
        }

    } while (1);

    zookeeper_close(zh);
    pthread_cond_destroy(&ctx.cond);
    pthread_mutex_destroy(&ctx.cond_lock);
}

        第18行,我们调用zoo_wexists方法监控节点状态。如果监控点设置成功,则等待上文中创建的条件变量。该条件变量在zoo_wexists参数的回调函数中被设置

void lock_watcher(zhandle_t* zh, int type, int state,
        const char* path, void* watcherCtx)
{
    //sleep(1);
    //fprintf(stdout, "lock_watcher: %s %d, %d\n", path, type, state); 
    if (type == ZOO_DELETED_EVENT) {
        //fprintf(stdout, "delete %s\n", path);
        watchctx_t* ctx = (watchctx_t*)watcherCtx;
        pthread_cond_signal(&ctx->cond);
    }
    else {
        //fprintf(stdout, "add %s\n", path);
	struct Stat stat;
	zoo_wexists(zh, path, lock_watcher, watcherCtx, &stat);
    }
}

        zookeeper的监控点是一次性的,即如果一次被触发则不再触发。于是在这个回调函数中,如果我们发现节点不是被删除——监控到它被其他Client创建,就再次注册该监控点。

        这样我们就使用了相对高大上的事件通知机制。但是问题随之而来,这种方式会引起惊群现象。即在一个Client释放锁后,其他Client都会尝试去调用zoo_create去获取锁,这会造成系统抖动很强烈。

        我们继续改进锁的设计。现在我们换个思路,让这些Client排着队去尝试获取锁。如果做呢?

        每个Client在Server上按顺序创建一个节点,并监控比自己小的那个节点。如果比自己小的那个节点(最接近自己的)被删除了,则意味着:

  1. 可能排在“我”前面的Client和Server断开了连接,那么此时应该还没轮到“我”,于是“我”要找到此时比“我”小的、最邻近的节点路径,然后去监控这个节点。
  2. 可能排在“我”前面的所有Client都获得过锁了,并且它们都释放了,现在轮到“我”来获得锁了。

        采用这种方式,我们可以最大限度的减少获取锁的行为。但是这对zookeeper提出了一个要求,我们可以原子性的创建包含单调递增数字的路径的节点。非常幸运的是,zookeeper的确提供了这样的方式——顺序节点。

void thread_routine(void* ptr) {

    zhandle_t* zh = init();
    if (!zh) {
        return;
    }

    watchctx_t ctx;
    if (0 != init_watchctx(&ctx)) {
        return;
    }

#define ROOT_PATH "/test_seq_lock"
    const char* root_path = ROOT_PATH;
    const char* lock_data = "lock";
    const char* lock_path = ROOT_PATH"/0";
    int ret = ZNODEEXISTS;
    do {
        const int seq_path_lenght = 512;
        char sequence_path[seq_path_lenght];
        ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
            &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL | ZOO_SEQUENCE,
            sequence_path, sizeof(sequence_path) - 1);
        if (ZNODEEXISTS == ret) {
            //fprintf(stderr, "lock exist\n");
        }
        else if (ZOK == ret) {
            ret = wait_for_lock(zh, &ctx, root_path, sequence_path);
            if (ZOK == ret) {
                pthread_t pid = pthread_self();
                fprintf(stdout, "%lu %s get lock\n", (long long)pid, sequence_path);
                sleep(0.1);
            }
            zoo_delete(zh, sequence_path, -1);
        }   
        else if (ZNONODE == ret) {
            ret = zoo_create(zh, root_path, lock_data, strlen(lock_data),
                &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
            if (ZNODEEXISTS != ret && ZOK != ret) {
                fprintf(stderr, "Error %d for %s\n", ret, "create root path");
                break;
            }
        }
        else {
            fprintf(stderr, "Error %d for %s\n", ret, "create");
        }

    } while (1);

    zookeeper_close(zh);
    pthread_cond_destroy(&ctx.cond);
    pthread_mutex_destroy(&ctx.cond_lock);
}

        第23行,我们给zoo_create方法传入了一个路径空间用于接收创建的有序节点路径。第28行,我们将这个路径连同条件变量一起传入自定义函数wait_for_lock去等待获得锁的时机。

int search_watch_neighbor(zhandle_t* zh, const char* root_path,
                            const char* cur_name,
                            char* neighbor_name, int len)
 {
    struct String_vector strings;
    int rc = zoo_get_children(zh, root_path, 0, &strings);
    if (ZOK != rc || 0 == strings.count) {
        return ZNOTEMPTY;       
    }

    int neighbor = -1;
    for (int i = 0; i < strings.count; i++) {
        int cmp = strcmp(cur_name, strings.data[i]);
        if (cmp <= 0) {
            continue;
        }

        if (-1 == neighbor) {
            neighbor = i;
            continue;
        }

        cmp = strcmp(strings.data[neighbor], strings.data[i]);
        if (cmp >= 0) {
            continue;
        }
        neighbor = i;
    }

    if (-1 == neighbor) {
        *neighbor_name = 0;
        return ZNONODE;
    }

    int neighbor_name_len = strlen(strings.data[neighbor]);
    if (len < neighbor_name_len - 1) {
        *neighbor_name = 0;
        return ZBADARGUMENTS;
    }

    memcpy(neighbor_name, strings.data[neighbor], neighbor_name_len);
    *(neighbor_name + neighbor_name_len) = '\0';
    fprintf(stdout, "********\n self: %s neighbor:%s\n*********\n", cur_name, neighbor_name);
    return ZOK;
}

void neighbor_watcher(zhandle_t* zh, int type, int state,
        const char* path, void* watcherCtx)
{
    if (type == ZOO_DELETED_EVENT) {
        watchctx_t* ctx = (watchctx_t*)watcherCtx;
        const int path_len_max = 512;
        char neighbor_name[path_len_max];
        int ret = search_watch_neighbor(zh, ctx->root_path, 
            ctx->cur_name, neighbor_name, sizeof(neighbor_name));
        if (ZNONODE == ret) {
            pthread_cond_signal(&ctx->cond);
        }
        else if (ZOK == ret) {
            char neighbor_path[path_len_max];
            sprintf(neighbor_path, "%s/%s", ctx->root_path, neighbor_name);
            struct Stat stat;
            zoo_wexists(zh, neighbor_path, neighbor_watcher, watcherCtx, &stat);
        }
    }
}

int wait_for_lock(zhandle_t* zh, watchctx_t* ctx, const char* root_path, const char* sequence_path) {
    strcpy(ctx->root_path, root_path);
    strcpy(ctx->cur_name, sequence_path + strlen(root_path) + 1);

    const int path_len_max = 512;
    char neighbor_name[path_len_max];
    int status = ZOK;

    do {
        int ret = search_watch_neighbor(zh, ctx->root_path, 
            ctx->cur_name, neighbor_name, sizeof(neighbor_name));

        char neighbor_path[path_len_max];
        sprintf(neighbor_path, "%s/%s", root_path, neighbor_name);

        pthread_t pid = pthread_self();
        fprintf(stdout, "%lu get neighbor info: %d %s\n", (long long)pid, ret, neighbor_path);

        if (ZNONODE == ret) {
            status = ZOK;
            break;
        }
        else if (ZOK == ret) {
            struct Stat stat;
            if (ZOK == zoo_wexists(zh, neighbor_path, neighbor_watcher, ctx, &stat)) {
                pthread_mutex_lock(&ctx->cond_lock);
                pthread_cond_wait(&ctx->cond, &ctx->cond_lock);
                pthread_mutex_unlock(&ctx->cond_lock);
            }
            else {
                continue;
            }
        }
        else {
            status = ZSYSTEMERROR;
            break;
        }
        
    } while(1);
    return status;
}

        再结合main函数的实现,两种不方式设计的分布式锁都可以运行起来

#define countof(x) sizeof(x)/sizeof(x[0])

int main(int argc, const char *argv[]) {
    const int thread_num = 3;
    pthread_t ids[thread_num];

    for (int i = 0; i < countof(ids); i++) {
        pthread_create(&ids[i], NULL, (void*)thread_routine, NULL);
    }

    for (int i = 0; i < countof(ids); i++) {
        pthread_join(ids[i], NULL);
    }

    return 0;
}

        将上述文件保存为lock_test.c,然后调用下面的指令编译

gcc -o lock_test lock_test.c -I/home/work/fangliang/zookeeper-3.4.11/src/c/generated -I/home/work/fangliang/zookeeper-3.4.11/src/c/include -L/home/work/fangliang/zookeeper-3.4.11/src/c/.libs -lzookeeper_mt -DTHREADED -std=c99

        关于zookeeper库的编译,网上有很多。我编译起来还算顺利,只是在找不到so时候使用下面指令指定下查找路径

export LD_LIBRARY_PATH=/home/work/fangliang/zookeeper-3.4.11/src/c/.libs:$LD_LIBRARY_PATH

        参考资料

  • https://www.cnblogs.com/xybaby/p/6871764.html
  • http://lib.csdn.net/article/hadoop/6665
  • https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/
  • http://www.cnblogs.com/haippy/archive/2013/02/21/2920280.html
  • http://zookeeper.sourcearchive.com/documentation/3.2.2plus-pdfsg3/zookeeper_8h.html
  • 《Zookeeper分布式过程协同技术详解》
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年02月27日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档