前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kazoo Python Zookeeper 选主

Kazoo Python Zookeeper 选主

作者头像
用户1225216
发布2018-03-05 14:52:50
1.8K2
发布2018-03-05 14:52:50
举报
文章被收录于专栏:扎心了老铁扎心了老铁

本文讲述基于zookeeper选主与故障切换的方法。我们的例子使用的是python。

使用的库是kazoo,安装方式

代码语言:javascript
复制
pip install kazoo 

应用场景:

  • 多个实例部署,但不是“去中心化”的部署方式;
  • 有且只有一个节点作为master,履行master的职责,在例子中是注册调度器;
  • 其他实例作为slave,不提供调度功能,但是在master节点挂掉之后,可以重新进行选主调度。

1、注册调度器

我们只给出伪代码,简单的打印调度器注册结果。

代码语言:javascript
复制
# -*- coding:utf-8 -*-


# 调度器注册和关闭
# 模拟主节点的职责
class MyScheduler(object):

    # 注册调度器
    def init_scheduler(self):
        print '########## 开启调度器成功 ############'

    # 关闭调度器
    def stop_scheduler(self):
        print '########## 关闭调度器成功 ############'

2、选主与故障切换代码

1)使用add_listener注册监听器,监听zookeeper会话超时,Session Expired,否则在会话超时的场景中会出现锁不一致的问题,可以参看这篇文章

  • 会话超时之后,我们要重新建立Session,在我们的例子里,会循环直到Session重新建立。
  • 重新注册Watcher,因为Watcher会随着Session失效而失效。在我们的例子里,通过执行get_children重新注册了Watcher。

2)向zookeeper注册自己,使用参数makepath=True级联创建节点;使用参数ephemeral=True, sequence=True参数,也就是创建临时有序节点:

  • 当实例挂掉,session断开,注册的节点会自行消失
  • 有序节点,会节点后添加一个有序编号

3)注册Watcher,观察/dmonitor/master的子节点,当发生以下事件时:

  • Created:新增子节点
  • Deleted:删除子节点
  • Changed:子节点数据变化
  • Child:子节点的下一级节点

进行重新选主。

代码语言:javascript
复制
# -*- coding:utf-8 -*-

import socket
import traceback
from kazoo.client import KazooClient
from kazoo.client import KazooState
from MyScheduler import MyScheduler


class HAMaster(object):

    def __init__(self):
        self.path = '/dmonitor/master'
        self.scheduler = MyScheduler()
        self.zk = KazooClient('10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181', timeout=10)
        self.zk.start()
        self.zk.add_listener(self.my_listener)
        self.is_leader = False

    def create_instance(self):
        instance = self.path + '/' + socket.gethostbyname(socket.gethostname()) + '-'
        self.zk.create(path=instance, value="", ephemeral=True, sequence=True, makepath=True)

    # 选主逻辑: master节点下, 所有ephemeral+sequence类型的节点中, 编号最大的获得领导权.
    def choose_master(self):
        print "########## 选主开始 ############"
        instance_list = self.zk.get_children(path=self.path, watch=self.my_watcher)
        instance = max(instance_list).split('-')[0]
        # 本实例获得领导权
        if instance == socket.gethostbyname(socket.gethostname()):
            if not self.is_leader:
                self.scheduler.init_scheduler()
                self.is_leader = True
                print "######### 我被选为master, 我以前不是master, 注册调度 ##########"
            else:
                print "######### 我被选为master, 我以前是master, 不再注册调度 ##########"
        # 本实例没有获得领导权
        else:
            if self.is_leader:
                self.scheduler.stop_scheduler()
                self.is_leader = False
                print "######### 我被选为slave, 我以前不是slave, 关闭调度 ##########"
            else:
                print "######### 我被选为slave, 我以前是slave, 不再关闭调度 ##########"
        print "########## 选主完成 ############"

    def my_listener(self, state):
        if state == KazooState.LOST:
            print "########## 会话超时:KazooState.LOST ############"
            while True:
                try:
                    self.create_instance()
                    self.zk.get_children(path=self.path, watch=self.my_watcher)
                    print "########## 会话超时:重建会话完成! ############"
                    break
                except Exception, _:
                    traceback.print_exc()
        elif state == KazooState.SUSPENDED:
            print "########## 会话超时:KazooState.SUSPENDED ############"
        elif state == KazooState.CONNECTED:
            print "########## 会话超时:KazooState.CONNECTED ############"
        else:
            print "########## 会话超时:非法状态 ############"

    def my_watcher(self, event):
        if event.state == "CONNECTED" and event.type == "CREATED" or event.type == "DELETED" or event.type == "CHANGED" or event.type == "CHILD":
            print "########## 监听到子节点变化事件 ############"
            self.choose_master()
        else:
            print "########## 监听到未识别的事件 ############"

3、测试一下

测试代码如下

代码语言:javascript
复制
# -*- coding:utf-8 -*-
import time
from HAMaster import HAMaster

ha = HAMaster()
# 向zk注册自己
ha.create_instance()
# 进行选主
ha.choose_master()

while 1:
    time.sleep(10)

运行这个脚本,模拟如下几个场景:

1)初始选主

 顺序在两台机器上启动测试脚本。

client10

代码语言:javascript
复制
[data_monitor@bigdata-arch-client10 zookeeper]$ python run.py 

client11

代码语言:javascript
复制
[data_monitor@bigdata-arch-client11 zookeeper]$ python run.py 

client10输出

代码语言:javascript
复制
[data_monitor@bigdata-arch-client10 zookeeper]$ python run.py 
########## 选主开始 ############
########## 开启调度器成功 ############
######### 我被选为master, 我以前不是master, 注册调度 ##########
########## 选主完成 ############
########## 监听到子节点变化事件 ############
########## 选主开始 ############
######### 我被选为master, 我以前是master, 不再注册调度 ##########
########## 选主完成 ############

client11输出

代码语言:javascript
复制
[data_monitor@bigdata-arch-client11 zookeeper]$ python run.py 
########## 选主开始 ############
######### 我被选为slave, 我以前是slave, 不再关闭调度 ##########
########## 选主完成 ############

可以看到,client10倍选为主节点并注册调度器,client11作为slave节点。

2)主实例挂掉

我们把client10上的实例kill掉。

client10上不再输出

client11上总体输出如下

代码语言:javascript
复制
[data_monitor@bigdata-arch-client11 zookeeper]$ python run.py 
########## 选主开始 ############
######### 我被选为slave, 我以前是slave, 不再关闭调度 ##########
########## 选主完成 ############
########## 监听到子节点变化事件 ############
########## 选主开始 ############
########## 开启调度器成功 ############
######### 我被选为master, 我以前不是master, 注册调度 ##########
########## 选主完成 ############

可以看到监听到节点的变化,并进行了重新选主,client11被选为主节点(只剩他了)并注册了调度器。

tips:观察到节点变化的实效性是通过timeout=10参数控制的,也就是超过10s session不能维持就会认为实例挂了,zookeeper会删除节点。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-10-14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档