ZanDB基于Celery定时任务的二次开发

任务系统一期

ZanDB早期的任务需求中,大部分都是针对servant(跑在主机上的agent)做任务调度。也就是说,一期的任务系统,满足的是在特定时刻调用特定主机执行特定的脚本,包括一天的某个时间点,一周的某个时间点,每隔多少时间执行一次调用。

在这个任务需求下,我们首先将相应的任务做好分组,例如,备份任务,备份校验任务等。然后维护一张任务分组和主机的关系表,再加一张任务的脚本表。通过django-crontab 每5分钟去扫描关系表,发现有符合条件需要执行的任务时,就调用servant执行任务,就满足了我们一期的任务需求。

任务系统二期

但是随着任务的增多,出现了其他类型的任务:我需要每天特定时刻执行一个函数,或者在每个月的特定时刻执行某个函数。这些任务有些是只在web服务器上运行的,这时候任务系统一期就开始显得力不从心了。

我们在调研了相关的任务后,发现Celery是非常符合我们的需求Celery 是通过队列实现异步执行任务,通过 Beat 可以实现定时任务调度,和crontab 的格式一模一样。同时,Celery还支持通过djcelery将period task 保存到数据库里面,实现任务的动态新增,编辑和删除,非常符合我们的需求,因此我们打算将djcelery引入进行相应的改造。

一、任务系统代码实现

1.下载djcelery的源代码

首先下载djcelery的源代码,作为django的一个app

git@github.com:celery/django-celery.git

INSTALLED_APPS = [
    ...
    'djcelery',
    'schedule_v2'
    ...
]

2.改造models

  • 新增一个period task的group。

由于原生的period task没有分组,我们又需要对任务进行分组。新增group的目的是方便对一个组的任务进行启用和禁用。例如,今天是双11,备份需要延迟备份,就需要先将整个备份任务禁用掉,到大促结束的时候再开启备份。同时可以对group做args 和 kwargs的设置,这样就不需要针对每个period task都编写重复的参数

   class ZanDBScheduleGroups(models.Model):
    """
    """
    id = models.AutoField(primary_key=True, verbose_name='主键')
    group_name = models.CharField(max_length=40, default='', verbose_name='group name')
    func_name = models.CharField(max_length=100, default='[]', verbose_name='函数名,和zandb_schedule_periodictask表的task对应')
    func_flag = models.BooleanField(default=True, verbose_name='是否需要指定函数,默认是')
    group_args = models.CharField(max_length=255, default='[]', verbose_name='任务参数,json')
    group_kwargs = models.CharField(max_length=255, default='{}', verbose_name='任务参数,json')
    group_desc = models.CharField(max_length=40, default='', verbose_name='schedule 分组描述')
    created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    updated_at = models.DateTimeField(auto_now=True, verbose_name='修改时间')
    is_enabled = models.BooleanField(default=True, verbose_name='是否启用,1是,2否')

    class Meta:
        db_table = "zandb_schedule_groups"
  • 针对原有的PeriodicTask model 增加两个字段,crontab model 添加一个字段 djcelery/models.py
@python_2_unicode_compatible
class PeriodicTask(models.Model):
    ...
    # 新增两个字段
    group_id = models.IntegerField(default=0)
    host_id = models.IntegerField(default=0)

    objects = managers.PeriodicTaskManager()
    no_changes = False

    class Meta:
        verbose_name = _('periodic task')
        verbose_name_plural = _('periodic tasks')
        db_table = "zandb_schedule_periodictask"

...

@python_2_unicode_compatible
class CrontabSchedule(models.Model):
    ...
    updated_at = models.DateTimeField(auto_now=True, verbose_name='update time')

PeriodicTask 添加host_id 目的是可以方便的对一个分组下的任务基于host做统计。

Crontab 需要添加一个updated_at字段。因为ZanDB是通过Proxy连接MySQL的,而Proxy的连接是复用的,CLIENT_FOUND_ROWS是针对连接的,为避免污染连接,Proxy是会抛弃这个CLIENT_FOUND_ROWS flag的。

  • Proxy 忽略了CLIENT_FOUND_ROWS
django/db/backends/mysql/base.py:253

kwargs['client_flag'] = CLIENT.FOUND_ROWS

# https://dev.mysql.com/doc/internals/en/capability-flags.html#packet-Protocol::CapabilityFlags

CLIENT_FOUND_ROWS
Send found rows instead of affected rows in EOF_Packet.

设置这个flag之后,MySQL 返回的affected rows 的值表示的就是founds rows了。

django/db/models/base.py:897

# If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
if pk_set and not force_insert:
    base_qs = cls._base_manager.using(using)
    values = [(f, None, (getattr(self, f.attname) if raw else f.pre_save(self, False)))
              for f in non_pks]
    forced_update = update_fields or force_update
    updated = self._do_update(base_qs, using, pk_val, values, update_fields,
                              forced_update)
    if force_update and not updated:
        raise DatabaseError("Forced update did not affect any rows.")
    if update_fields and not updated:
        raise DatabaseError("Save with update_fields did not affect any rows.")
if not updated:
    if meta.order_with_respect_to:
        # If this is a model with an order_with_respect_to
        # autopopulate the _order field
        field = meta.order_with_respect_to
        filter_args = field.get_filter_kwargs_for_object(self)
        order_value = cls._base_manager.using(using).filter(**filter_args).count()
        self._order = order_value

    fields = meta.local_concrete_fields
    if not pk_set:
        fields = [f for f in fields if f is not meta.auto_field]

    update_pk = meta.auto_field and not pk_set
    result = self._do_insert(cls._base_manager, using, fields, update_pk, raw)
    if update_pk:
        setattr(self, meta.pk.attname, result)

而django 在调用 object.save()是时候,会尝试进行UPDATE,判断affect rows是否大于1来确认更新是否成功,如果affect rows为0,则执行INSERT。由于django设置了CLIENT_FOUND_ROWS,即使字段没有任何更新,但是founds rows 为1,因此返回的affect rows 为1。而我们的Proxy又抛弃了CLIENT_FOUND_ROWS 这个flag,此时返回affects rows 为0,所以很不幸,会再次执行insert,就报主键冲突了。所以需要添加updated 字段,避免django这个FLAG的影响。

3.改造DatabaseScheduler

  • djcelery 通过数据库获取任务的类是 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' 。 而获取所有的schedule是通过all_as_schedule 这个函数获取的,因此我们只需要改造这个函数即可。 djcelery/schedulers.py
  • 改造的目的:
  1. 如果group已经被禁用了,那么该group下的的所有Period Task自然需要被禁用;
  2. 将group 的args 和 kwargs 添加到Period Task 里面, 避免重复编写参数
  3. 如果host_id > 0,就将host_id 作为kwargs 传递给具体的执行函数,这样就增加了任务的扩展性。
class DatabaseScheduler(Scheduler):
    
    ...
    
    def all_as_schedule(self):
        debug('DatabaseScheduler: Fetching database schedule')
        s = {}
        for model in self.Model.objects.enabled():
            try:
                if model.name == 'celery.backend_cleanup':
                    s[model.name] = self.Entry(model)
                    info('Adding System Task celery.backend_cleanup')
                else:
                    schedule_group_obj = ZanDBScheduleGroups.objects.get(id=model.group_id)
                    if model.group_id and schedule_group_obj.is_enabled:
                        try:
                            host_name = ZanDBHost.objects.get(id=model.host_id).host_name
                        except ZanDBHost.DoesNotExist:
                            host_name = u'无'
                        info("Adding Task:%s,%s,Host:%s" % (model.task, model.name, host_name))
                        new_kwargs = json.loads(model.kwargs)
                        new_args = json.loads(model.args)
                        if model.host_id > 0:
                            debug('Adding hostid:%s into %s kwargs' % (model.host_id, model.name))
                            new_kwargs['host_id'] = model.host_id

                        schedule_group_obj_kwargs = json.loads(schedule_group_obj.group_kwargs)
                        if schedule_group_obj_kwargs:
                            debug('Adding Schedule Group kwargs:%s into %s kwargs' % (schedule_group_obj.group_kwargs,
                                                                                      model.name))
                            for k, v in schedule_group_obj_kwargs.items():
                                new_kwargs[k] = v
                        schedule_group_obj_args = json.loads(schedule_group_obj.group_args)
                        if schedule_group_obj_args:
                            debug('Adding Schedule Group args:%s into %s args' % (schedule_group_obj.group_args,
                                                                                  model.name))

                            new_args.extend(schedule_group_obj_args)

                        model.kwargs = json.dumps(new_kwargs)
                        model.args = json.dumps(new_args)
                        s[model.name] = self.Entry(model)
                    else:
                        info("Job Group %s is disabled, skip %s %s" % (model.group_id, model.name, model.task))
            except ZanDBScheduleGroups.DoesNotExist:
                info("Period Schedule:%s %s not belong to any Job Group,Skip... " % (model.name, model.task))
                pass
            except Exception as e:
                error(traceback.format_exc(e))
                pass

        return s

4.改造任务动态获取

  • 删除所有的动态获取signal

Celery worker 是通过PeriodicTasks model的last update 字段去判断是否需要拉取最新的任务列表,如果时间没有发生变更,那么就不需要拉取;如果值发生了改变,就调用all_as_schedule拉取最新的任务列表。

djcelery/models.py:315

# signals.pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask)
# signals.pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask)
  • 添加自定义的任务重载

在删除和保存前进行修改更新时间,似乎不合理,应该更新完成后,执行更新,避免Beat拉取到了老的任务列表。我们将这两行代码注释掉,利用装饰器装饰view的方法来修改最后更新时间,避免批量修改操作频繁拉取任务列表。

def reload_celery(func):
    def wrapper(*args, **kwargs):
        r = func(*args, **kwargs)
        PeriodicTasks.changed(PeriodicTask)
        return r
    return wrapper  
  • 添加servant的task 函数
schedule_v2/tasks.py

@shared_task(bind=True)
def servant_tasks(self, servant_cmd, host_id, *args, **kwargs):
    """
    执行servant 命令的任务
    :param self:
    :param servant_cmd: servant 的调用命令
    :param host_id: zandb host  id
    :param args:
    :param kwargs:
    :return:
    """
    # 获取对应的主机和IP地址
    get_host_ip_sql = 'SELECT id,ip,host_name,agent_port,status FROM zandb_host WHERE id = %s  limit 1'
    task_id = self.request.id
    mydb = MySQLUtils()
    try:
            host_info = mydb.select(get_host_ip_sql, (host_id,))
            if host_info and host_info[0]['status'] in [1, 3]:
                if servant_cmd:
                        uri = servant_cmd + '?schedule_id=' + task_id
                        try:
                            call_agent_by_hostname_and_uri(host_info[0]['host_name'], uri, ip_flag=True,
                                                           special_port=host_info[0]['agent_port'])

                        except Exception as e:
                            logger_err.error(traceback.format_exc(e))

                else:
                    logger_err.error(self.id + ' lose servant cmd')
            else:
                logger_info.warning('%s Status is %s,Skip...' % (host_info[0]['host_name'],
                                                                 HOST_STATUS[str(host_info[0]['status'])]))

    except Exception as e:
        logger_err.error(traceback.format_exc(e))

最后添加上装饰器

@reload_celery
def batch_task_add(request, groupid):
    """批量添加任务"""
    request_data = json.loads(request.body)
    try:
        with transaction.atomic():
            for item in request_data:
                PeriodicTask.objects.create(group_id=groupid, **item)
    except Exception as e:

        return HttpResponse(complex_jsondumps({'result': str(e.args)+str(e.message)}), content_type='application/json')

    return HttpResponse(complex_jsondumps({'result': 'ok'}), content_type='application/json')

二、任务系统界面

1.任务分组界面

可以新增、修改、删除任务分组,设置任务分组是否需要统一函数,任务分组的参数,启用禁用等。

2.任务列表页

查看任务具体的执行情况,任务列表,函数等。新增、修改、删除单个任务,同时支持批量添加

3.批量添加

设置好想要的crontab和参数后,选择对应的主机,添加添加任务,就可以实现批量添加,简化重复劳动

小结

作为DBA,喜欢自己写SQL去实现相应的逻辑,更加直观一些。使用django对象模型等开源的框架也有很大的好处,可以简化很多代码,减少重复劳动。但是在使用的过程中,还是需要小心有些坑,多做测试,根据自己的环境和逻辑进行相应的改造,满足需求。

原文发表时间:2018-02-13

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏FreeBuf

Kali下常用安全工具中文参数说明(160个)

*本文原创作者:屌丝绅士,属Freebuf原创奖励计划,转载请注明来自FreeBuf 由于篇幅有限,只列举部分,ps:第一次发有什么不对的 还望各位大大指正 n...

1.3K90
来自专栏张善友的专栏

ASP.NET Web API 处理架构

这篇文章主要是介绍ASP.NET Web API的处理架构:当一个HTTP请求到达直到产生一个请求的过程。ASP.NET Web API 的处理架构图如下,主要...

31380
来自专栏Esofar 开发日记

[译]ASP.NET Core依赖注入深入讨论

这篇文章我们来深入探讨ASP.NET Core、MVC Core中的依赖注入,我们将示范几乎所有可能的操作把依赖项注入到组件中。

12510
来自专栏小狼的世界

Curl操作Elasticsearch的常用方法

Elasticsearch对于文档操作,提供了以下几种API,本文就说明如何使用curl方式来调用这些API。

17220
来自专栏aoho求索

基于可靠消息方案的分布式事务(四):接入Lottor服务

在上一篇文章中,通过Lottor Sample介绍了快速体验分布式事务Lottor。本文将会介绍如何将微服务中的生产方和消费方服务接入Lottor。

30610
来自专栏技术点滴

远程线程注入引出的问题

远程线程注入引出的问题 一、远程线程注入基本原理 远程线程注入——相信对Windows底层编程和系统安全熟悉的人并不陌生,其主要核心在于一个Windows AP...

303100
来自专栏Danny的专栏

ASP.NET实现文件的上传和下载

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/huyuyang6688/article/...

3.6K30
来自专栏容器云生态

Openstack平台搭建之第二天

Openstack平台搭建之第二天 If you have any question ,please contact me by weichuangxxb@si...

544100
来自专栏Jackson0714

sys.dm_db_wait_stats

394120
来自专栏散尽浮华

Centos7下关于系统用户密码规则-运维笔记

1)密码长度、有效期 /etc/login.defs文件是当创建用户时的一些规划,比如创建用户时,是否需要家目录,UID和GID的范围;用户的期限等等,这个文...

44840

扫码关注云+社区

领取腾讯云代金券