前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python与ZooKeeper集群连接

Python与ZooKeeper集群连接

作者头像
py3study
发布2020-01-08 17:16:06
1.7K0
发布2020-01-08 17:16:06
举报
文章被收录于专栏:python3python3

由于项目的需要,需要学习Python客户端连接ZooKeeper集群,并实现创建临时节点、获得指定的路径下的信息、监听子节点变化的功能。

环境配置

ZooKeeper集群的安装可以参考http://blog.csdn.net/mrbcy/article/details/54767484

使用下面的命令安装kazoo

代码语言:javascript
复制
pip install kazoo

基本使用

这一部分可参考官方文档:http://kazoo.readthedocs.io/en/latest/basic_usage.htm

监听子节点变化

下面的代码实现了创建一个临时、顺序的节点,并且可以监听子节点的变化。

代码语言:javascript
复制
#-*- coding: utf-8 -*-
import time
from kazoo.client import KazooClient
from kazoo.recipe.watchers import ChildrenWatch




class ValidatorDetector:

    def __init__(self):
        self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181')
        self.validator_children_watcher = ChildrenWatch(client=self.zk,path='/mproxy/validators',func=self.validator_watcher_fun)
        self.zk.start()

    def validator_watcher_fun(self,children):
        print "The children now are:", children

    def create_node(self):
        self.zk.create('/mproxy/validators/validator',b'validator_huabei_1',ephemeral=True,sequence=True,makepath=True)

    def __del__(self):
        self.zk.close()





if __name__ == '__main__':
    detector = ValidatorDetector()
    detector.create_node()
    time.sleep(10)

ZooKeeper原生提供了监听节点变化及值的变化的API。关于这一部分可以参考http://blog.csdn.net/mrbcy/article/details/54790758。但是这些API只能生效一次,一旦被触发过一次以后就不会再触发了,除非再次注册。而kazoo则在这个基础上封装了更上层的API,可以持续的触发。这就是上面的ChildrenWatch,除此之外kazoo还封装了一个DataWatch,用于监听数据的变化。下面我们也会用到。

kazoo还实现了自动续订功能,使得在会话过期后我们不需要再次初始化ZooKeeper客户端(这里可以参考http://blog.csdn.net/mrbcy/article/details/55062713),也是非常方便的。

注册验证器

有了上面的知识就可以做一个注册类和一个监测类了。

代码语言:javascript
复制
#-*- coding: utf-8 -*-
import threading
import time
from kazoo.client import KazooClient
from kazoo.protocol.states import KazooState

class InfoKeeper(threading.Thread):
    def __init__(self,register):
        threading.Thread.__init__(self)
        self.register=register

    def run(self):
        time.sleep(0.25)
        if self.register.zk_node is None:
            print "create method has not been called"
            return
        check_result = self.register.zk.exists(self.register.validator_path)
        if check_result is None:
            # redo the regist
            print "redo the regist"
            self.register.regist()
        else:
            print "the path remain exists"

class ValidatorRegister:
    def __init__(self):
        self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181')
        self.zk_node = None
        self.validator_path = '/mproxy/validators/'
        self.zk.add_listener(self.conn_state_watcher)
        self.zk.start()


    def __del__(self):
        self.zk.close()

    def regist(self):
        self.zk_node = self.zk.create(self.validator_path + 'validator',bytes('validator_huabei_1'),ephemeral=True,sequence=True,makepath=True)

    def close(self):
        self.zk.stop()
        self.zk.close()

    def conn_state_watcher(self, state):
        if state == KazooState.CONNECTED:
            print "Now connected"

            if self.zk_node is None:
                print "create method has not been called"
                return
            info_keeper = InfoKeeper(self)
            info_keeper.start()
        elif state == KazooState.LOST:
            print "Now lost"
        else:
            print "Now suspended"

监测类:

代码语言:javascript
复制
#-*- coding: utf-8 -*-
import time
from kazoo.client import KazooClient
from kazoo.recipe.watchers import ChildrenWatch




class ValidatorDetector:

    def __init__(self):
        self.validator_path = '/mproxy/validators/'
        self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181')
        self.validator_children_watcher = ChildrenWatch(client=self.zk,path=self.validator_path,func=self.validator_watcher_fun)
        self.zk.start()

    def validator_watcher_fun(self,children):
        for child in children:
            validator_name = self.zk.get(path=self.validator_path + str(child))
            print validator_name[0]
        print "The children now are:", children


    def __del__(self):
        self.zk.close()

注册类这里稍微复杂了一点,做了一个在会话过期后重新注册的机制,如果会话过期,重新注册之前的注册信息。

监听子节点值的变化

嗯,这个需求仔细想过后可以通过监听子节点的变化来代替,所以暂时不实现了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 环境配置
  • 基本使用
  • 监听子节点变化
  • 注册验证器
    • 监听子节点值的变化
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档