前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python分布式事务方案(一)tcc

python分布式事务方案(一)tcc

作者头像
一笠风雨任生平
发布2019-08-02 11:08:16
1.3K0
发布2019-08-02 11:08:16
举报
文章被收录于专栏:服务化进程服务化进程

python分布式事务方案(一)tcc

随着单体应用的拆分以及服务化的流行,现在分布式事务已经比较常见,分布式事务理论ACID、CAP、BASE等我就不说了,现在就直接说一下一种常见的解决方案-tcc TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:

  • Try 阶段主要是对业务系统做检测及资源预留
  • Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 Confirm阶段时,默认 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
  • Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

优点: 跟和两阶段提交比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些

缺点: 缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。

下面介绍下我们应用的一种场景,有一个运维系统需要运用到zabbix,而运维系统拆分出了一个配置中心,下面是子系统依赖图

在这里插入图片描述
在这里插入图片描述

在配置告警策略时需要调用zabbix接口

在这里插入图片描述
在这里插入图片描述

这时就涉及到一个分布式事务。由于我们这里只涉及到两个事务,所以我这里就写了一个zabbix代理client,来作为事务协调器

代码语言:javascript
复制
class ZabbixClientProxy(object):
    '''
    zabbix client simple proxy
    '''
    client = models.get_zbx_client()

    def __init__(self):
        self.create_triggers = list()
        self.update_triggers = list()
        self.delete_triggers = list()
        self.update_macros = list()

    def trigger_create(self, name, expression,uuid):
        try:
            trigger = self.client.hosts.trigger_create(name, expression, 1)
            trigger["uuid"]=uuid
            self.create_triggers.append(trigger)
            logger.debug("trigger_create " + name)
            return trigger
        except Exception, e:
            logger.error("trigger_create fail,cause by " + e.message)
            raise

    def trigger_update(self, triggerid, name, expression,uuid):
        try:
            logger.debug("trigger_update " + name)
            old_trigger = self.client.hosts.trigger_get(triggerid)
            update_result = self.client.hosts.trigger_update(
                    triggerid, name=name, expression=expression, priority=1, enable=True)
            old_trigger["uuid"]=uuid
            logger.debug(old_trigger)
            self.update_triggers.append(old_trigger)
            return update_result
        except Exception, e:
            logger.error("trigger_update fail,cause by " + e.message)

    def trigger_delete(self, triggerid,uuid):
        try:
            logger.debug("trigger_delete " + triggerid)
            old_trigger = self.client.hosts.trigger_get(triggerid)
            delete_result = self.client.hosts.trigger_delete(triggerid)
            old_trigger["uuid"]=uuid
            self.delete_triggers.append(old_trigger)
            return delete_result
        except Exception, e:
            logger.error("trigger_delete fail,cause by " + e.message)

    def update_trigger_macro(self, uuid, item_threshold, alert_duration):
        all_hmacros = self.get_macro_by_name(uuid)
        if all_hmacros and len(all_hmacros) > 2:
            self.update_macro(all_hmacros, "DISK_USER_MAX", item_threshold)
            self.update_macro(all_hmacros, "DISK_USER_TIMES", str(alert_duration) + "m")
            self.update_macro(all_hmacros, "DISK_USER_ENABLE", 1)
        else:
            self.create_macro("DISK_USER_MAX", item_threshold, uuid)
            self.create_macro("DISK_USER_TIMES", str(alert_duration) + "m", uuid)
            self.create_macro("DISK_USER_ENABLE", 1, uuid)

    def stop_trigger(self, assets):
        if assets:
            for asset in assets:
                if asset.host is None:
                    continue
                all_hmacros = self.get_macro_by_name(asset.host.uuid)
                if all_hmacros and len(all_hmacros) > 2:
                    self.update_macro(all_hmacros, "DISK_USER_ENABLE", 0)
                else:
                    self.create_macro("DISK_USER_MAX", 80, asset.host.uuid)
                    self.create_macro("DISK_USER_TIMES", "5m", asset.host.uuid)
                    self.create_macro("DISK_USER_ENABLE", 0, asset.host.uuid)

    def get_macro_by_name(self, uuid):
        return self.client.macros.list(uuid)

    def update_macro(self, all_hmacros, macro_name, value):
        for macro in all_hmacros:
            if macro['macro'] == ('{$' + macro_name + '}'):
                try:
                    self.client.macros.update(macro['hostmacroid'], macro=macro_name, value=value)
                    macro['name'] = macro_name
                    self.update_macros.append(macro)
                    logger.debug('update_macro ' + macro_name + ' to ' + str(value))
                except Exception, e:
                    logger.error('update_macro ' + macro_name + ' fail,case by ' + e.message)

    def create_macro(self, macro_name, value, uuid):
        try:
            hostid = self.client.macros._get_hostid(uuid)
            hmacro = self.client.macros.create(macro_name, value, hostid)
            logger.debug("create_macro success,macro_name:" + macro_name + ",value:" + str(value))
        except Exception, e:
            logger.error("create_macro fail,cause by " + e.message)

    def trigger_get(self, triggerid):
        return self.client.hosts.trigger_get(triggerid)

    def trigger_list(self, hostid):
        return self.client.hosts.trigger_list(hostid)

    def item_list(self, uuid):
        return self.client.hosts.item_list(uuid)

    def rollback(self):
        logger.debug("start rollback")
        # rollback create
        for trigger in self.create_triggers:
            try:
                self.client.hosts.trigger_delete(trigger["triggerid"])
                logger.debug('rollback_create_trigger ' + trigger["name"])
            except Exception, e:
                logger.error('rollback_create_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message))
        self.create_triggers = []
        for trigger in self.update_triggers:
            try:
                expression=trigger["expression"].replace(trigger['uuid']+']','{HOST.HOST}]')
                self.client.hosts.trigger_update(trigger["triggerid"], name=trigger["name"],
                                                 expression=expression, priority=1, enable=True)
                logger.debug('rollback_update_trigger ' + trigger["name"])

            except Exception, e:
                logger.error('rollback_update_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message))
        self.update_triggers = []
        for trigger in self.delete_triggers:
            try:
                expression=trigger["expression"].replace(trigger['uuid']+']','{HOST.HOST}]')
                new_trigger = self.client.hosts.trigger_create(trigger["name"], expression, 1)
                logger.debug(new_trigger)
                logger.debug('rollback_delete_trigger ' + trigger["name"])
                # 更新数据中的zabbix trigger id
                alert_models.ConditionTrigger.objects.filter(zabbix_trigger_id=trigger["triggerid"]).update(
                        zabbix_trigger_id=new_trigger["triggerid"])
            except Exception, e:
                logger.error('rollback_delete_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message))
        self.delete_triggers = []

        for macro in self.update_macros:
            try:
                self.client.macros.update(macro['hostmacroid'], macro=macro['name'], value=macro['value'])
            except Exception, e:
                logger.error('rollback_update_macro ' + macro['name'] + ' fail,case by ' + str(e.message))
        logger.debug("end rollback")

事务成功,则提交本地事务,如果失败则调用rollback

代码语言:javascript
复制
def create(self, request, *args, **kwargs):
    '''
    policy add
    '''
    assets = request.data["data"]
    client = ZabbixClientProxy()
    try:
        with transaction.atomic():
            #save policy
            #将client作为参数,对主机、监控项、触发器进行增删改
    except rest_framework_serializers.ValidationError, e:
        logger.exception(e)
        client.rollback()
        raise

这样做还有一个问题就是,在回滚中如果网络突然断了这时会回滚失败,这里我们记录了日志,后面我们会通过扫描日志来做到最终一致性,这里我们后面坐了补偿,下一次修改时会自动修正回滚失败问题。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • python分布式事务方案(一)tcc
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档