python codis集群客户端(一) - 基于客户端daemon探活与服务列表维护

在使用codis时候,我们遇到的场景是,公司提供了HA的Proxy(例如N个),但是不暴露zookeeper(也就是说没有codis后端服务列表)。

如果暴露zk的话,可以看这一篇

要求在开发客户端api的过程中,自己进行探活&故障摘除&负载均衡。

我这里做了一个简单的实现,提供给大家参考。本实例支持使用在server或者daemon中。

我们的实现叫做pycodis。

1、核心文件CodisPool.py

在这个文件里,实现了多服务端实例链接,探活与故障摘除,负载均衡策略(随机&roundrobin)。

1)探活

由于公司不暴露CodisProxy的zookeeper,我们在编写客户端程序的时候无法获取活跃的codisProxy服务列表。我们不得不自己去探活并且保存活跃的服务列表。

我们通过在初始化CodisPool实例时候,启动了后台线程_check_available_backgroud,去轮询配置的codisProxy列表,我们的探活与摘除满足以下3个要求:

a)如果在一定的阈值时间段内(默认3s),某个codisProxy始终无法提供服务,就暂时将它从codisProxy列表中摘除;

b)被摘除的codisProxy实例无法通过get_connection得到,但是在codisProxy列表中保留它,让它可以在满足条件的情况下复起;

c)当被摘除的codisProxy恢复了,就把它放到可用codisProxy列表中,这样通过get_connetion又能拿到它。

注:这里通过get_connection拿到的proxy,其实现方式是redis链接池。

2)负载均衡

我们通过get_connection这个函数在proxy列表中得到一个可用链接,那么获取可用链接的负载均衡算法是怎样的呢?

PickUp抽象类,定义了负载均衡类需要实现的方法,pick_up()。

我们实现了两种负载均衡算法:

a)RandomPickUp,随机负载均衡

b)RoundRobinPickUp,轮询负载均衡

3)单例模式的保证

为了不至于在程序运行时创建很多的链接池实例,这是违反设计初衷的,链接池就没有意义了,我们需要实现为线程安全的单例模式。

在CodisPool实现中只保证了不会额外建立太多的链接池--我们在__init__中一次性创建好了Podis实例,并且放入到proxy列表中。单例模式留待使用中去实现,最简单的方式是,在python的module中先把CodisPool的实例建立好。然后其他的线程在去访问。可以参考我们的example实例。

# -*- coding:utf-8 -*-
import abc
import uuid
import time
import logging
import traceback
import threading
import redis
from Podis import Podis

logger = logging.getLogger(__name__)


class CodisPool(object):

    def __init__(self, codis_config):
        self._pool_shards = []
        self._availables = []
        self._connection = None
        self._create_pool(codis_config)
        self._check_available_backgroud()

    def _check_available_backgroud(self):
        bg = Background(self._check_pool_shards)
        bg.start()

    def _create_pool(self, codis_config):
        address_list = codis_config.get('addrs').split(',')
        for address in address_list:
            host = address.split(':')[0]
            port = address.split(':')[1]
            self._pool_shards.append(
                Podis(
                    redis.ConnectionPool(
                        host=host, port=port, db=0,
                        password=codis_config.get('password'),
                        max_connections=codis_config.get('max_connections')
                    )
                )
            )
            self._availables.append(True)
        if len(self._pool_shards) == 0:
            logger.error('创建codis链接池失败')
            raise Exception('创建codis链接池失败')

    def _check_pool_shards(self):
        while True:
            self._pool_shards_is_available()

    def _pool_shards_is_available(self, retry_num=3):
        i = 0
        for pool in self._pool_shards:
            try:
                retry = retry_num
                go_on = True
                while go_on and retry > 0:
                    try:
                        pong = pool.ping()
                        if not pong:
                            retry -= 1
                        else:
                            go_on = False
                    except Exception, ex:
                        retry -= 1
                        raise
                    finally:
                        time.sleep(1)
                if retry <= 0:
                    self._availables[i] = False
                else:
                    self._availables[i] = True
            except Exception, ex:
                logger.error(traceback.format_exc())
            finally:
                i += 1

    def _get_available_shards(self):
        i = 0
        available_shards = []
        for shard in self._pool_shards:
            if self._availables[i]:
               available_shards.append(shard)
            i += 1
        return available_shards

    def get_connection(self, pick_up=None):
        if isinstance(pick_up, PickUp):
            codisPool = pick_up.pick_up(self._get_available_shards())
        else:
            pick_up = RandomPickUp()
            codisPool = pick_up.pick_up(self._get_available_shards())
        return codisPool

    def get_pool_shards(self):
        return self._pool_shards

    def get_availables(self):
        return self._get_available_shards()


class Background(object):
    def __init__(self, target, daemon=True):
        self.daemon = daemon
        self.thread = threading.Thread(target=target)

    def start(self):
        self.thread.setDaemon(self.daemon)
        self.thread.start()


class PickUp(object):

    __metaclass__ = abc.ABCMeta

    @abc.abstractmethod
    def __init__(self):
        pass

    @abc.abstractmethod
    def pick_up(self, pool_list):
        return


class RandomPickUp(PickUp):
    def __init__(self):
        PickUp.__init__(self)

    def pick_up(self, pool_list):
        pool_size = len(pool_list)
        index = abs(hash(uuid.uuid4())) % pool_size
        pool = pool_list[index]
        print "RandomPickUp, 拿到第", index, "个pool"
        return pool


class RoundRobinPickUp(PickUp):

    def __init__(self):
        PickUp.__init__(self)
        self.index = 0
        self.round_robin_lock = threading.Lock()

    def pick_up(self, pool_list):
        with self.round_robin_lock:
            pool_size = len(pool_list)
            self.index += 1
            index = abs(self.index) % pool_size
            pool = pool_list[index]
            print "RoundRobinPickUp, 拿到第", index, "个pool"
            return pool

2、Podis,实际的句柄资源

在CodisPool获得的句柄就是本类的一个实例。

# -*- coding:utf-8 -*-
import redis
import logging
import traceback

logger = logging.getLogger(__name__)


def redis_getter(func):
    def wrapper(*args, **kwargs):
        try:
            result = func(*args, **kwargs)
            return result or None
        except Exception, ex:
            logger.error(traceback.format_exc())
            raise
    return wrapper


def redis_setter(func):
    def wrapper(*args, **kwargs):
        try:
            func(*args, **kwargs)
            return True
        except Exception, ex:
            logger.error(traceback.format_exc())
            raise
    return wrapper


class Podis(object):

    def __init__(self, pool):
        self._connection = redis.StrictRedis(connection_pool=pool)

    @redis_getter
    def ping(self):
        return self._connection.ping()

    @redis_getter
    def get(self, key):
        return self._connection.get(key)

    @redis_setter
    def set(self, key, value):
        self._connection.set(key, value)

    @redis_setter
    def lpush(self, key, *value):
        self._connection.lpush(key, *value)

    @redis_getter
    def lpop(self, key):
        return self._connection.lpop(key)

    @redis_getter
    def lrange(self, key, start, end):
        return self._connection.lrange(key, start, end)

    @redis_setter
    def sadd(self, key, *value):
        self._connection.sadd(key, *value)

    @redis_setter
    def srem(self, key, *value):
        self._connection.srem(key, *value)

    @redis_getter
    def zrange(self,key,start,end):
        return self._connection.zrange(key,start,end)

    @redis_getter
    def zrevrange(self,key,start,end):
        return self._connection.zrevrange(key,start,end,withscores=True)

    @redis_getter
    def zscore(self,key,*value):
        return self._connection.zscore(key,value)

    @redis_setter
    def zadd(self,key,score,*value):
        self._connection.zadd(key,score,value)

    @redis_getter
    def smembers(self, key):
        return self._connection.smembers(key)

    @redis_getter
    def hgetall(self, key):
        return self._connection.hgetall(key)

    @redis_getter
    def hget(self, key, name):
        return self._connection.hget(key, name)

    @redis_getter
    def hkeys(self, key):
        return self._connection.hkeys(key)

    @redis_setter
    def hset(self, key, name, value):
        self._connection.hset(key, name, value)

    @redis_setter
    def hmset(self, name, mapping):
        self._connection.hmset(name, mapping)

    @redis_setter
    def hdel(self, key, name):
        self._connection.hdel(key, name)

    @redis_setter
    def delete(self, *key):
        self._connection.delete(*key)

    # codis不支持
    @redis_getter
    def keys(self, pattern):
        return self._connection.keys(pattern)

    @redis_setter
    def expire(self, key, time):
        return self._connection.expire(key, time)

    @redis_getter
    def ttl(self, key):
        return self._connection.ttl(key)

3、配置文件CodisConfig.py

这里我没有对最大连接数,超时时间等等做配置,你可以根据你的场景自行添加。

codis_config = {
    'addrs': '100.90.186.47:3000,100.90.187.33:3000'
}

4、单测和使用

我们模拟在并发场景下,对资源的获得和释放情况。

import time
import threading
from pycodis.CodisConfig import codis_config
from pycodis.CodisPool import CodisPool, RoundRobinPickUp

codis_pool1 = CodisPool(codis_config)
print '------1-------'
pick_up1 = RoundRobinPickUp()
print '------2-------'
codis_pool2 = CodisPool(codis_config)
print '------3-------'
pick_up2 = RoundRobinPickUp()
print '------4-------'


def func(i):
    for i in range(10):
        podis1 = codis_pool1.get_connection(pick_up=pick_up1)
        podis2 = codis_pool2.get_connection(pick_up=pick_up2)
        podis1.delete(i)
        podis2.delete(i)
        time.sleep(1)

thread_list = []
for i in range(100):
    thread_list.append(threading.Thread(target=func, args=[i]))

for thread in thread_list:
    thread.setDaemon(True)
    thread.start()

time.sleep(10)

可以看到打印的信息,每次都不一样

------1-------
------2-------
------3-------
------4-------
RoundRobinPickUp, 拿到第 1 个pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 01 个pool
 个pool
RoundRobinPickUp, 拿到第 0 个poolRoundRobinPickUp, 拿到第
 1 个pool
RoundRobinPickUp, 拿到第 1RoundRobinPickUp, 拿到第  个pool
0 个pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第0  个pool
1 个pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 10 个pool
 RoundRobinPickUp, 拿到第个pool
 1RoundRobinPickUp, 拿到第  个pool
RoundRobinPickUp, 拿到第 00  个pool个pool

RoundRobinPickUp, 拿到第 1RoundRobinPickUp, 拿到第  个pool
1 个poolRoundRobinPickUp, 拿到第
0RoundRobinPickUp, 拿到第 0 个pool
 RoundRobinPickUp, 拿到第 1 个pool
个pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 01 个pool
个pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 10  个pool个pool

RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 1 个pool
0 RoundRobinPickUp, 拿到第 个pool
RoundRobinPickUp, 拿到第 0 个pool1
 RoundRobinPickUp, 拿到第个pool
1 个poolRoundRobinPickUp, 拿到第
RoundRobinPickUp, 拿到第 0 0个pool
 RoundRobinPickUp, 拿到第 个pool1
 个poolRoundRobinPickUp, 拿到第
RoundRobinPickUp, 拿到第 10 个pool 个pool

RoundRobinPickUp, 拿到第 0RoundRobinPickUp, 拿到第  个pool
1 个poolRoundRobinPickUp, 拿到第 1 个pool

RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 0 个pool
0 个pool
RoundRobinPickUp, 拿到第RoundRobinPickUp, 拿到第  1 个pool
1 个poolRoundRobinPickUp, 拿到第
0 个pool
 RoundRobinPickUp, 拿到第RoundRobinPickUp, 拿到第 0 个pool
RoundRobinPickUp, 拿到第 11  个pool个pool

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Flutter&Dart

DartVM服务器开发(第二天)--处理请求

在上一节中,我们对所有请求都回复它一个Hello World!这个信息,我们现在改变一下,添加一个方法,传入request,把Hello World!这一条注释...

27620
来自专栏行者悟空

Hadoop之RPC机制

12710
来自专栏C/C++基础

进程与线程的区别

在开发工作中,尤其是对负载较大的服务端程序的开发,为充分发挥处理器多核性能,提高硬件资源利用率,增加系统吞吐量,少不了并发编程。并发编程一般通过多进程和多线程的...

9730
来自专栏Java架构师进阶

java多线程高级教程,这些你都懂了吗?

一、countdownLatch和cyclicbarrier(这两个做多线程控制很好用,工作中会经常用到)

11440
来自专栏有趣的django

Flask请求扩展和数据库连接池

32700
来自专栏osc同步分享-java技术分享站

How Tomcat Works, A Guide to Developing Your Own Java Servlet Container

1.1 socket网络通信基础 客户端使用主机地址和端口实例化一个socket,此socket通过输出流将字符串等传向服务器主机。 服务器使用端口号实例化一个...

29660
来自专栏偏前端工程师的驿站

深入线程

前言                                         在校时认识的线程就是获取CPU执行时间的最小单位,多个线程共享所在进程的资...

201100
来自专栏Linux驱动

45.INIT_WORK()工作队列使用

中断中通过调用schedule_work(work)来通知内核线程,然后中断结束后,再去继续执行work对应的func函数

22310
来自专栏大内老A

我的WCF之旅(3):在WCF中实现双工通信

双工(Duplex)模式的消息交换方式体现在消息交换过程中,参与的双方均可以向对方发送消息。基于双工MEP消息交换可以看成是多个基本模式下(比如请求-回复模式和...

23490
来自专栏JavaEdge

Redis键过期策略

44680

扫码关注云+社区

领取腾讯云代金券