ZanDB早期的任务需求中,大部分都是针对servant(跑在主机上的agent)做任务调度。也就是说,一期的任务系统,满足的是在特定时刻调用特定主机执行特定的脚本,包括一天的某个时间点,一周的某个时间点,每隔多少时间执行一次调用。
在这个任务需求下,我们首先将相应的任务做好分组,例如,备份任务,备份校验任务等。然后维护一张任务分组和主机的关系表,再加一张任务的脚本表。通过django-crontab 每5分钟去扫描关系表,发现有符合条件需要执行的任务时,就调用servant执行任务,就满足了我们一期的任务需求。
但是随着任务的增多,出现了其他类型的任务:我需要每天特定时刻执行一个函数,或者在每个月的特定时刻执行某个函数。这些任务有些是只在web服务器上运行的,这时候任务系统一期就开始显得力不从心了。
我们在调研了相关的任务后,发现Celery是非常符合我们的需求Celery 是通过队列实现异步执行任务,通过 Beat 可以实现定时任务调度,和crontab 的格式一模一样。同时,Celery还支持通过djcelery将period task 保存到数据库里面,实现任务的动态新增,编辑和删除,非常符合我们的需求,因此我们打算将djcelery引入进行相应的改造。
首先下载djcelery的源代码,作为django的一个app
git@github.com:celery/django-celery.git
INSTALLED_APPS = [
...
'djcelery',
'schedule_v2'
...
]
由于原生的period task没有分组,我们又需要对任务进行分组。新增group的目的是方便对一个组的任务进行启用和禁用。例如,今天是双11,备份需要延迟备份,就需要先将整个备份任务禁用掉,到大促结束的时候再开启备份。同时可以对group做args 和 kwargs的设置,这样就不需要针对每个period task都编写重复的参数
class ZanDBScheduleGroups(models.Model):
"""
"""
id = models.AutoField(primary_key=True, verbose_name='主键')
group_name = models.CharField(max_length=40, default='', verbose_name='group name')
func_name = models.CharField(max_length=100, default='[]', verbose_name='函数名,和zandb_schedule_periodictask表的task对应')
func_flag = models.BooleanField(default=True, verbose_name='是否需要指定函数,默认是')
group_args = models.CharField(max_length=255, default='[]', verbose_name='任务参数,json')
group_kwargs = models.CharField(max_length=255, default='{}', verbose_name='任务参数,json')
group_desc = models.CharField(max_length=40, default='', verbose_name='schedule 分组描述')
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updated_at = models.DateTimeField(auto_now=True, verbose_name='修改时间')
is_enabled = models.BooleanField(default=True, verbose_name='是否启用,1是,2否')
class Meta:
db_table = "zandb_schedule_groups"
djcelery/models.py
@python_2_unicode_compatible
class PeriodicTask(models.Model):
...
# 新增两个字段
group_id = models.IntegerField(default=0)
host_id = models.IntegerField(default=0)
objects = managers.PeriodicTaskManager()
no_changes = False
class Meta:
verbose_name = _('periodic task')
verbose_name_plural = _('periodic tasks')
db_table = "zandb_schedule_periodictask"
...
@python_2_unicode_compatible
class CrontabSchedule(models.Model):
...
updated_at = models.DateTimeField(auto_now=True, verbose_name='update time')
PeriodicTask 添加host_id 目的是可以方便的对一个分组下的任务基于host做统计。
Crontab 需要添加一个updated_at字段。因为ZanDB是通过Proxy连接MySQL的,而Proxy的连接是复用的,CLIENT_FOUND_ROWS是针对连接的,为避免污染连接,Proxy是会抛弃这个CLIENT_FOUND_ROWS flag的。
django/db/backends/mysql/base.py:253
kwargs['client_flag'] = CLIENT.FOUND_ROWS
# https://dev.mysql.com/doc/internals/en/capability-flags.html#packet-Protocol::CapabilityFlags
CLIENT_FOUND_ROWS
Send found rows instead of affected rows in EOF_Packet.
设置这个flag之后,MySQL 返回的affected rows 的值表示的就是founds rows了。
django/db/models/base.py:897
# If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
if pk_set and not force_insert:
base_qs = cls._base_manager.using(using)
values = [(f, None, (getattr(self, f.attname) if raw else f.pre_save(self, False)))
for f in non_pks]
forced_update = update_fields or force_update
updated = self._do_update(base_qs, using, pk_val, values, update_fields,
forced_update)
if force_update and not updated:
raise DatabaseError("Forced update did not affect any rows.")
if update_fields and not updated:
raise DatabaseError("Save with update_fields did not affect any rows.")
if not updated:
if meta.order_with_respect_to:
# If this is a model with an order_with_respect_to
# autopopulate the _order field
field = meta.order_with_respect_to
filter_args = field.get_filter_kwargs_for_object(self)
order_value = cls._base_manager.using(using).filter(**filter_args).count()
self._order = order_value
fields = meta.local_concrete_fields
if not pk_set:
fields = [f for f in fields if f is not meta.auto_field]
update_pk = meta.auto_field and not pk_set
result = self._do_insert(cls._base_manager, using, fields, update_pk, raw)
if update_pk:
setattr(self, meta.pk.attname, result)
而django 在调用 object.save()是时候,会尝试进行UPDATE,判断affect rows是否大于1来确认更新是否成功,如果affect rows为0,则执行INSERT。由于django设置了CLIENT_FOUND_ROWS,即使字段没有任何更新,但是founds rows 为1,因此返回的affect rows 为1。而我们的Proxy又抛弃了CLIENT_FOUND_ROWS 这个flag,此时返回affects rows 为0,所以很不幸,会再次执行insert,就报主键冲突了。所以需要添加updated 字段,避免django这个FLAG的影响。
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
。 而获取所有的schedule是通过all_as_schedule 这个函数获取的,因此我们只需要改造这个函数即可。 djcelery/schedulers.py
class DatabaseScheduler(Scheduler):
...
def all_as_schedule(self):
debug('DatabaseScheduler: Fetching database schedule')
s = {}
for model in self.Model.objects.enabled():
try:
if model.name == 'celery.backend_cleanup':
s[model.name] = self.Entry(model)
info('Adding System Task celery.backend_cleanup')
else:
schedule_group_obj = ZanDBScheduleGroups.objects.get(id=model.group_id)
if model.group_id and schedule_group_obj.is_enabled:
try:
host_name = ZanDBHost.objects.get(id=model.host_id).host_name
except ZanDBHost.DoesNotExist:
host_name = u'无'
info("Adding Task:%s,%s,Host:%s" % (model.task, model.name, host_name))
new_kwargs = json.loads(model.kwargs)
new_args = json.loads(model.args)
if model.host_id > 0:
debug('Adding hostid:%s into %s kwargs' % (model.host_id, model.name))
new_kwargs['host_id'] = model.host_id
schedule_group_obj_kwargs = json.loads(schedule_group_obj.group_kwargs)
if schedule_group_obj_kwargs:
debug('Adding Schedule Group kwargs:%s into %s kwargs' % (schedule_group_obj.group_kwargs,
model.name))
for k, v in schedule_group_obj_kwargs.items():
new_kwargs[k] = v
schedule_group_obj_args = json.loads(schedule_group_obj.group_args)
if schedule_group_obj_args:
debug('Adding Schedule Group args:%s into %s args' % (schedule_group_obj.group_args,
model.name))
new_args.extend(schedule_group_obj_args)
model.kwargs = json.dumps(new_kwargs)
model.args = json.dumps(new_args)
s[model.name] = self.Entry(model)
else:
info("Job Group %s is disabled, skip %s %s" % (model.group_id, model.name, model.task))
except ZanDBScheduleGroups.DoesNotExist:
info("Period Schedule:%s %s not belong to any Job Group,Skip... " % (model.name, model.task))
pass
except Exception as e:
error(traceback.format_exc(e))
pass
return s
Celery worker 是通过PeriodicTasks model的last update 字段去判断是否需要拉取最新的任务列表,如果时间没有发生变更,那么就不需要拉取;如果值发生了改变,就调用all_as_schedule拉取最新的任务列表。
djcelery/models.py:315
# signals.pre_delete.connect(PeriodicTasks.changed, sender=PeriodicTask)
# signals.pre_save.connect(PeriodicTasks.changed, sender=PeriodicTask)
在删除和保存前进行修改更新时间,似乎不合理,应该更新完成后,执行更新,避免Beat拉取到了老的任务列表。我们将这两行代码注释掉,利用装饰器装饰view的方法来修改最后更新时间,避免批量修改操作频繁拉取任务列表。
def reload_celery(func):
def wrapper(*args, **kwargs):
r = func(*args, **kwargs)
PeriodicTasks.changed(PeriodicTask)
return r
return wrapper
schedule_v2/tasks.py
@shared_task(bind=True)
def servant_tasks(self, servant_cmd, host_id, *args, **kwargs):
"""
执行servant 命令的任务
:param self:
:param servant_cmd: servant 的调用命令
:param host_id: zandb host id
:param args:
:param kwargs:
:return:
"""
# 获取对应的主机和IP地址
get_host_ip_sql = 'SELECT id,ip,host_name,agent_port,status FROM zandb_host WHERE id = %s limit 1'
task_id = self.request.id
mydb = MySQLUtils()
try:
host_info = mydb.select(get_host_ip_sql, (host_id,))
if host_info and host_info[0]['status'] in [1, 3]:
if servant_cmd:
uri = servant_cmd + '?schedule_id=' + task_id
try:
call_agent_by_hostname_and_uri(host_info[0]['host_name'], uri, ip_flag=True,
special_port=host_info[0]['agent_port'])
except Exception as e:
logger_err.error(traceback.format_exc(e))
else:
logger_err.error(self.id + ' lose servant cmd')
else:
logger_info.warning('%s Status is %s,Skip...' % (host_info[0]['host_name'],
HOST_STATUS[str(host_info[0]['status'])]))
except Exception as e:
logger_err.error(traceback.format_exc(e))
最后添加上装饰器
@reload_celery
def batch_task_add(request, groupid):
"""批量添加任务"""
request_data = json.loads(request.body)
try:
with transaction.atomic():
for item in request_data:
PeriodicTask.objects.create(group_id=groupid, **item)
except Exception as e:
return HttpResponse(complex_jsondumps({'result': str(e.args)+str(e.message)}), content_type='application/json')
return HttpResponse(complex_jsondumps({'result': 'ok'}), content_type='application/json')
可以新增、修改、删除任务分组,设置任务分组是否需要统一函数,任务分组的参数,启用禁用等。
查看任务具体的执行情况,任务列表,函数等。新增、修改、删除单个任务,同时支持批量添加
设置好想要的crontab和参数后,选择对应的主机,添加添加任务,就可以实现批量添加,简化重复劳动
作为DBA,喜欢自己写SQL去实现相应的逻辑,更加直观一些。使用django对象模型等开源的框架也有很大的好处,可以简化很多代码,减少重复劳动。但是在使用的过程中,还是需要小心有些坑,多做测试,根据自己的环境和逻辑进行相应的改造,满足需求。