在使用django集成celery进行了异步调度任务之后,如果想对失败的任务进行跟踪或者告警,怎么做?
这里提供一个亲测的方法。
1、任务callback
假如你想在任务执行失败的时候,打印错误信息并且发出报警,该怎么搞。有两个方法:
(1)link_error
(2)on_failure/on_success
link_error的方法比较爽,但是我没有亲测过,on_failure的方式,是当任务抛出异常的时候,会触发一些事件,提供给大家代码:
定义一个新类重写Task里的on_success和on_failure方法:
from celery.app.task import Task
class CallbackTask(Task):
def __init__(self):
super(CallbackTask, self).__init__()
def on_success(self, retval, task_id, args, kwargs):
try:
item_param= json.loads(args[0])
logger.info('[task_id] %s, [task_type] %s, finished successfully.' % (task_id, item_param.get('task_type')))
except Exception, ex:
logger.error(traceback.format_exc())
def on_failure(self, exc, task_id, args, kwargs, einfo):
try:
item_param = json.loads(args[0])
logger.error(('Task {0} raised exception: {1!r}\n{2!r}'.format(
task_id, exc, einfo.traceback)))
except Exception, ex:
logger.error(traceback.format_exc())
装饰器使用新类作为baseClass
from celery import task
from common.callback import CallbackTask
logger = logging.getLogger(__name__)
@task(base=CallbackTask)
def quota_check(item_param):
logger.info('start')
return