前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ZanDB基于Celery定时任务的二次开发

ZanDB基于Celery定时任务的二次开发

作者头像
用户1278550
发布2018-08-09 14:02:51
7960
发布2018-08-09 14:02:51
举报
文章被收录于专栏:idba

任务系统一期

ZanDB早期的任务需求中,大部分都是针对servant(跑在主机上的agent)做任务调度。也就是说,一期的任务系统,满足的是在特定时刻调用特定主机执行特定的脚本,包括一天的某个时间点,一周的某个时间点,每隔多少时间执行一次调用。

在这个任务需求下,我们首先将相应的任务做好分组,例如,备份任务,备份校验任务等。然后维护一张任务分组和主机的关系表,再加一张任务的脚本表。通过django-crontab 每5分钟去扫描关系表,发现有符合条件需要执行的任务时,就调用servant执行任务,就满足了我们一期的任务需求。

任务系统二期

但是随着任务的增多,出现了其他类型的任务:我需要每天特定时刻执行一个函数,或者在每个月的特定时刻执行某个函数。这些任务有些是只在web服务器上运行的,这时候任务系统一期就开始显得力不从心了。

我们在调研了相关的任务后,发现Celery是非常符合我们的需求Celery 是通过队列实现异步执行任务,通过 Beat 可以实现定时任务调度,和crontab 的格式一模一样。同时,Celery还支持通过djcelery将period task 保存到数据库里面,实现任务的动态新增,编辑和删除,非常符合我们的需求,因此我们打算将djcelery引入进行相应的改造。

一、任务系统代码实现

1.下载djcelery的源代码

首先下载djcelery的源代码,作为django的一个app

代码语言:javascript
复制
git@github.com:celery/django-celery.git

INSTALLED_APPS = [
    ...
    'djcelery',
    'schedule_v2'
    ...
]

2.改造models

  • 新增一个period task的group。

由于原生的period task没有分组,我们又需要对任务进行分组。新增group的目的是方便对一个组的任务进行启用和禁用。例如,今天是双11,备份需要延迟备份,就需要先将整个备份任务禁用掉,到大促结束的时候再开启备份。同时可以对group做args 和 kwargs的设置,这样就不需要针对每个period task都编写重复的参数

代码语言:javascript
复制
   class ZanDBScheduleGroups(models.Model):
    """
    """
    id = models.AutoField(primary_key=True, verbose_name='主键')
    group_name = models.CharField(max_length=40, default='', verbose_name='group name')
    func_name = models.CharField(max_length=100, default='[]', verbose_name='函数名,和zandb_schedule_periodictask表的task对应')
    func_flag = models.BooleanField(default=True, verbose_name='是否需要指定函数,默认是')
    group_args = models.CharField(max_length=255, default='[]', verbose_name='任务参数,json')
    group_kwargs = models.CharField(max_length=255, default='{}', verbose_name='任务参数,json')
    group_desc = models.CharField(max_length=40, default='', verbose_name='schedule 分组描述')
    created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    updated_at = models.DateTimeField(auto_now=True, verbose_name='修改时间')
    is_enabled = models.BooleanField(default=True, verbose_name='是否启用,1是,2否')

    class Meta:
        db_table = "zandb_schedule_groups"
  • 针对原有的PeriodicTask model 增加两个字段,crontab model 添加一个字段 djcelery/models.py
代码语言:javascript
复制
@python_2_unicode_compatible
class PeriodicTask(models.Model):
    ...
    # 新增两个字段
    group_id = models.IntegerField(default=0)
    host_id = models.IntegerField(default=0)

    objects = managers.PeriodicTaskManager()
    no_changes = False

    class Meta:
        verbose_name = _('periodic task')
        verbose_name_plural = _('periodic tasks')
        db_table = "zandb_schedule_periodictask"

...

@python_2_unicode_compatible
class CrontabSchedule(models.Model):
    ...
    updated_at = models.DateTimeField(auto_now=True, verbose_name='update time')

PeriodicTask 添加host_id 目的是可以方便的对一个分组下的任务基于host做统计。

Crontab 需要添加一个updated_at字段。因为ZanDB是通过Proxy连接MySQL的,而Proxy的连接是复用的,CLIENT_FOUND_ROWS是针对连接的,为避免污染连接,Proxy是会抛弃这个CLIENT_FOUND_ROWS flag的。

  • Proxy 忽略了CLIENT_FOUND_ROWS
代码语言:javascript
复制
django/db/backends/mysql/base.py:253

kwargs['client_flag'] = CLIENT.FOUND_ROWS

# https://dev.mysql.com/doc/internals/en/capability-flags.html#packet-Protocol::CapabilityFlags

CLIENT_FOUND_ROWS
Send found rows instead of affected rows in EOF_Packet.

设置这个flag之后,MySQL 返回的affected rows 的值表示的就是founds rows了。

代码语言:javascript
复制
django/db/models/base.py:897

# If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
if pk_set and not force_insert:
    base_qs = cls._base_manager.using(using)
    values = [(f, None, (getattr(self, f.attname) if raw else f.pre_save(self, False)))
              for f in non_pks]
    forced_update = update_fields or force_update
    updated = self._do_update(base_qs, using, pk_val, values, update_fields,
                              forced_update)
    if force_update and not updated:
        raise DatabaseError("Forced update did not affect any rows.")
    if update_fields and not updated:
        raise DatabaseError("Save with update_fields did not affect any rows.")
if not updated:
    if meta.order_with_respect_to:
        # If this is a model with an order_with_respect_to
        # autopopulate the _order field
        field = meta.order_with_respect_to
        filter_args = field.get_filter_kwargs_for_object(self)
        order_value = cls._base_manager.using(using).filter(**filter_args).count()
        self._order = order_value

    fields = meta.local_concrete_fields
    if not pk_set:
        fields = [f for f in fields if f is not meta.auto_field]

    update_pk = meta.auto_field and not pk_set
    result = self._do_insert(cls._base_manager, using, fields, update_pk, raw)
    if update_pk:
        setattr(self, meta.pk.attname, result)

而django 在调用 object.save()是时候,会尝试进行UPDATE,判断affect rows是否大于1来确认更新是否成功,如果affect rows为0,则执行INSERT。由于django设置了CLIENT_FOUND_ROWS,即使字段没有任何更新,但是founds rows 为1,因此返回的affect rows 为1。而我们的Proxy又抛弃了CLIENT_FOUND_ROWS 这个flag,此时返回affects rows 为0,所以很不幸,会再次执行insert,就报主键冲突了。所以需要添加updated 字段,避免django这个FLAG的影响。

3.改造DatabaseScheduler

  • djcelery 通过数据库获取任务的类是 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' 。 而获取所有的schedule是通过all_as_schedule 这个函数获取的,因此我们只需要改造这个函数即可。 djcelery/schedulers.py
  • 改造的目的:
  1. 如果group已经被禁用了,那么该group下的的所有Period Task自然需要被禁用;
  2. 将group 的args 和 kwargs 添加到Period Task 里面, 避免重复编写参数
  3. 如果host_id > 0,就将host_id 作为kwargs 传递给具体的执行函数,这样就增加了任务的扩展性。
代码语言:javascript
复制
class DatabaseScheduler(Scheduler):
    
    ...
    
    def all_as_schedule(self):
        debug('DatabaseScheduler: Fetching database schedule')
        s = {}
        for model in self.Model.objects.enabled():
            try:
                if model.name == 'celery.backend_cleanup':
                    s[model.name] = self.Entry(model)
                    info('Adding System Task celery.backend_cleanup')
                else:
                    schedule_group_obj = ZanDBScheduleGroups.objects.get(id=model.group_id)
                    if model.group_id and schedule_group_obj.is_enabled:
                        try:
                            host_name = ZanDBHost.objects.get(id=model.host_id).host_name
                        except ZanDBHost.DoesNotExist:
                            host_name = u'无'
                        info("Adding Task:%s,%s,Host:%s" % (model.task, model.name, host_name))
                        new_kwargs = json.loads(model.kwargs)
                        new_args = json.loads(model.args)
                        if model.host_id > 0:
                            debug('Adding hostid:%s into %s kwargs' % (model.host_id, model.name))
                            new_kwargs['host_id'] = model.host_id

                        schedule_group_obj_kwargs = json.loads(schedule_group_obj.group_kwargs)
                        if schedule_group_obj_kwargs:
                            debug('Adding Schedule Group kwargs:%s into %s kwargs' % (schedule_group_obj.group_kwargs,
                                                                                      model.name))
                            for k, v in schedule_group_obj_kwargs.items():
                                new_kwargs[k] = v
                        schedule_group_obj_args = json.loads(schedule_group_obj.group_args)
                        if schedule_group_obj_args:
                            debug('Adding Schedule Group args:%s into %s args' % (schedule_group_obj.group_args,
                                                                                  model.name))

                            new_args.extend(schedule_group_obj_args)

                        model.kwargs = json.dumps(new_kwargs)
                        model.args = json.dumps(new_args)
                        s[model.name] = self.Entry(model)
                    else:
                        info("Job Group %s is disabled, skip %s %s" % (model.group_id, model.name, model.task))
            except ZanDBScheduleGroups.DoesNotExist:
                info("Period Schedule:%s %s not belong to any Job Group,Skip... " % (model.name, model.task))
                pass
            except Exception as e:
                error(traceback.format_exc(e))
                pass

        return s

4.改造任务动态获取

  • 删除所有的动态获取signal

Celery worker 是通过PeriodicTasks model的last update 字段去判断是否需要拉取最新的任务列表,如果时间没有发生变更,那么就不需要拉取;如果值发生了改变,就调用all_as_schedule拉取最新的任务列表。

代码语言:javascript
复制
djcelery/models.py:315

# signals.pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask)
# signals.pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask)
  • 添加自定义的任务重载

在删除和保存前进行修改更新时间,似乎不合理,应该更新完成后,执行更新,避免Beat拉取到了老的任务列表。我们将这两行代码注释掉,利用装饰器装饰view的方法来修改最后更新时间,避免批量修改操作频繁拉取任务列表。

代码语言:javascript
复制
def reload_celery(func):
    def wrapper(*args, **kwargs):
        r = func(*args, **kwargs)
        PeriodicTasks.changed(PeriodicTask)
        return r
    return wrapper  
  • 添加servant的task 函数
代码语言:javascript
复制
schedule_v2/tasks.py

@shared_task(bind=True)
def servant_tasks(self, servant_cmd, host_id, *args, **kwargs):
    """
    执行servant 命令的任务
    :param self:
    :param servant_cmd: servant 的调用命令
    :param host_id: zandb host  id
    :param args:
    :param kwargs:
    :return:
    """
    # 获取对应的主机和IP地址
    get_host_ip_sql = 'SELECT id,ip,host_name,agent_port,status FROM zandb_host WHERE id = %s  limit 1'
    task_id = self.request.id
    mydb = MySQLUtils()
    try:
            host_info = mydb.select(get_host_ip_sql, (host_id,))
            if host_info and host_info[0]['status'] in [1, 3]:
                if servant_cmd:
                        uri = servant_cmd + '?schedule_id=' + task_id
                        try:
                            call_agent_by_hostname_and_uri(host_info[0]['host_name'], uri, ip_flag=True,
                                                           special_port=host_info[0]['agent_port'])

                        except Exception as e:
                            logger_err.error(traceback.format_exc(e))

                else:
                    logger_err.error(self.id + ' lose servant cmd')
            else:
                logger_info.warning('%s Status is %s,Skip...' % (host_info[0]['host_name'],
                                                                 HOST_STATUS[str(host_info[0]['status'])]))

    except Exception as e:
        logger_err.error(traceback.format_exc(e))

最后添加上装饰器

代码语言:javascript
复制
@reload_celery
def batch_task_add(request, groupid):
    """批量添加任务"""
    request_data = json.loads(request.body)
    try:
        with transaction.atomic():
            for item in request_data:
                PeriodicTask.objects.create(group_id=groupid, **item)
    except Exception as e:

        return HttpResponse(complex_jsondumps({'result': str(e.args)+str(e.message)}), content_type='application/json')

    return HttpResponse(complex_jsondumps({'result': 'ok'}), content_type='application/json')

二、任务系统界面

1.任务分组界面

可以新增、修改、删除任务分组,设置任务分组是否需要统一函数,任务分组的参数,启用禁用等。

2.任务列表页

查看任务具体的执行情况,任务列表,函数等。新增、修改、删除单个任务,同时支持批量添加

3.批量添加

设置好想要的crontab和参数后,选择对应的主机,添加添加任务,就可以实现批量添加,简化重复劳动

小结

作为DBA,喜欢自己写SQL去实现相应的逻辑,更加直观一些。使用django对象模型等开源的框架也有很大的好处,可以简化很多代码,减少重复劳动。但是在使用的过程中,还是需要小心有些坑,多做测试,根据自己的环境和逻辑进行相应的改造,满足需求。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-02-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 任务系统一期
    • 任务系统二期
      • 一、任务系统代码实现
      • 2.改造models
      • 3.改造DatabaseScheduler
      • 4.改造任务动态获取
      • 二、任务系统界面
    • 小结
    相关产品与服务
    数据库
    云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档