前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python codis集群客户端(二) - 基于zookeeper对实例创建与摘除

python codis集群客户端(二) - 基于zookeeper对实例创建与摘除

作者头像
用户1225216
发布2018-03-05 15:01:20
1.7K2
发布2018-03-05 15:01:20
举报
文章被收录于专栏:扎心了老铁扎心了老铁

 在这一篇中我们实现了不通过zk来编写codis集群proxys的api,

如果codis集群暴露zk给你的话,那么就方便了,探活和故障摘除与恢复codis集群都给你搞定了,你只需要监听zookeeper中实例的状态就好了。

下面看我的实现。

1、CodisByZKPool.py

这里通过zk读取并初始化pool_shards,简单说一下如何故障摘除和恢复

1)我们监听zk中节点状态改变,当发现某个实例对应的节点状态变化了,比如DELETE了,那么我们认为这个实例挂了,我们就会重新_create_pool刷新shards列表,摘除故障实例。

2)同样,当我们发现节点CREATE,就是新增了实例,或者实例从崩溃中恢复了,我们也会重新_create_pool刷新shards列表,新增实例。

代码语言:javascript
复制
# -*- coding:utf-8 -*-
import redis
import logging
from kazoo.client import KazooClient
from Podis import Podis
from PickUp import RandomPickUp, PickUp

logger = logging.getLogger(__name__)


class CodisByZKPool(object):

    def __init__(self, zk_config):
        self._pool_shards = []
        self.zk_config = zk_config
        self.zk = self._init_zk()

    def _init_zk(self):
        return KazooClient(hosts=self.zk_config.get('hosts'), timeout=self.zk_config.get('timeout'))

    def _create_pool(self):
        try:
            if not self.zk.connected:
                self.zk.start()
            address_list = self.zk.get_children(self.zk_config.get('path'), watch=self._watch_codis_instances)
            for address in address_list:
                host = address.split(':')[0]
                port = address.split(':')[1]
                self._pool_shards.append(
                    Podis(
                        redis.ConnectionPool(
                            host=host, port=port, db=0,
                            password=None,
                            max_connections=None
                        )
                    )
                )
            if len(self._pool_shards) == 0:
                raise Exception('create pool failure!')
        except Exception, ex:
            raise
        finally:
            self.zk.stop()

    def _watch_codis_instances(self, event):
        if event.type == "CREATED" and event.state == "CONNECTED":
            self._create_pool()
        elif event.type == "DELETED" and event.state == "CONNECTED":
            self._create_pool()
        elif event.type == "CHANGED" and event.state == "CONNECTED":
            self._create_pool()
        elif event.type == "CHILD" and event.state == "CONNECTED":
            self._create_pool()
        else:
            logger.error('failure: not cover this event - %s'.format(event.type))

    def get_connection(self, pick_up=None):
        if isinstance(pick_up, PickUp):
            codisPool = pick_up.pick_up(self._pool_shards)
        else:
            pick_up = RandomPickUp()
            codisPool = pick_up.pick_up(self._pool_shards)
        return codisPool

    def get_availables(self):
        return self._pool_shards

2、负载均衡PickUp.py

上一篇一样,这里就不多说了。

代码语言:javascript
复制
# -*- coding:utf-8 -*-
import abc
import uuid
import threading

class PickUp(object):

    __metaclass__ = abc.ABCMeta

    @abc.abstractmethod
    def __init__(self):
        pass

    @abc.abstractmethod
    def pick_up(self, pool_list):
        return


class RandomPickUp(PickUp):
    def __init__(self):
        PickUp.__init__(self)

    def pick_up(self, pool_list):
        pool_size = len(pool_list)
        index = abs(hash(uuid.uuid4())) % pool_size
        pool = pool_list[index]
        print "RandomPickUp, 拿到第", index, "个pool"
        return pool


class RoundRobinPickUp(PickUp):

    def __init__(self):
        PickUp.__init__(self)
        self.index = 0
        self.round_robin_lock = threading.Lock()

    def pick_up(self, pool_list):
        with self.round_robin_lock:
            pool_size = len(pool_list)
            self.index += 1
            index = abs(self.index) % pool_size
            pool = pool_list[index]
            print "RoundRobinPickUp, 拿到第", index, "个pool"
            return pool

3、配置文件

这里就只用zk_config就可以了,我们认为在zk中已经有所有的codisproxy实例的address了。

代码语言:javascript
复制
codis_config = {
    'addrs': '100.90.186.47:3000,100.90.187.33:3000'
}

zk_config = {
    'hosts': '10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181',
    'timeout': 10,
    'path': '/codis/instances'
}

4、链接类Podis.py

代码语言:javascript
复制
# -*- coding:utf-8 -*-
import redis
import logging
import traceback

logger = logging.getLogger(__name__)


def redis_getter(func):
    def wrapper(*args, **kwargs):
        try:
            result = func(*args, **kwargs)
            return result or None
        except Exception, ex:
            logger.error(traceback.format_exc())
            raise
    return wrapper


def redis_setter(func):
    def wrapper(*args, **kwargs):
        try:
            func(*args, **kwargs)
            return True
        except Exception, ex:
            logger.error(traceback.format_exc())
            raise
    return wrapper


class Podis(object):

    def __init__(self, pool):
        self._connection = redis.StrictRedis(connection_pool=pool)

    @redis_getter
    def ping(self):
        return self._connection.ping()

    @redis_getter
    def get(self, key):
        return self._connection.get(key)

    @redis_setter
    def set(self, key, value):
        self._connection.set(key, value)

    @redis_setter
    def lpush(self, key, *value):
        self._connection.lpush(key, *value)

    @redis_getter
    def lpop(self, key):
        return self._connection.lpop(key)

    @redis_getter
    def lrange(self, key, start, end):
        return self._connection.lrange(key, start, end)

    @redis_setter
    def sadd(self, key, *value):
        self._connection.sadd(key, *value)

    @redis_setter
    def srem(self, key, *value):
        self._connection.srem(key, *value)

    @redis_getter
    def zrange(self,key,start,end):
        return self._connection.zrange(key,start,end)

    @redis_getter
    def zrevrange(self,key,start,end):
        return self._connection.zrevrange(key,start,end,withscores=True)

    @redis_getter
    def zscore(self,key,*value):
        return self._connection.zscore(key,value)

    @redis_setter
    def zadd(self,key,score,*value):
        self._connection.zadd(key,score,value)

    @redis_getter
    def smembers(self, key):
        return self._connection.smembers(key)

    @redis_getter
    def hgetall(self, key):
        return self._connection.hgetall(key)

    @redis_getter
    def hget(self, key, name):
        return self._connection.hget(key, name)

    @redis_getter
    def hkeys(self, key):
        return self._connection.hkeys(key)

    @redis_setter
    def hset(self, key, name, value):
        self._connection.hset(key, name, value)

    @redis_setter
    def hmset(self, name, mapping):
        self._connection.hmset(name, mapping)

    @redis_setter
    def hdel(self, key, name):
        self._connection.hdel(key, name)

    @redis_setter
    def delete(self, *key):
        self._connection.delete(*key)

    # codis不支持
    @redis_getter
    def keys(self, pattern):
        return self._connection.keys(pattern)

    @redis_setter
    def expire(self, key, time):
        return self._connection.expire(key, time)

    @redis_getter
    def ttl(self, key):
        return self._connection.ttl(key)

5、例子

代码语言:javascript
复制
import sys
sys.path.append('../')
import time
import threading
from pycodis.CodisConfig import zk_config
from pycodis.CodisByZKPool import CodisByZKPool
from pycodis.PickUp import RoundRobinPickUp

codis_pool1 = CodisByZKPool(zk_config)
print '------1-------'
pick_up1 = RoundRobinPickUp()
print '------2-------'
codis_pool2 = CodisByZKPool(zk_config)
print '------3-------'
pick_up2 = RoundRobinPickUp()
print '------4-------'


def func(i):
    for i in range(10):
        podis1 = codis_pool1.get_connection(pick_up=pick_up1)
        podis2 = codis_pool2.get_connection(pick_up=pick_up2)
        podis1.delete(i)
        podis2.delete(i)
        time.sleep(1)

thread_list = []
for i in range(100):
    thread_list.append(threading.Thread(target=func, args=[i]))

for thread in thread_list:
    thread.setDaemon(True)
    thread.start()

time.sleep(10)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-09-06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档