Celery用户手册 - Tasks

Celery用户手册 - Tasks

Posted April 19, 2016

Tasks是Celery 应用的构建块。事实上Celery应用是由一个或多个Task拼装组成的。

一个Task即是一个对象, Task被创建后可以被所有调用, 它是双重角色, 当Task被调用可以通过Task可以发送消息, 同时当作为一个worker的时候可以接收消息,并消费。

每个Task name 都是唯一的, 这样可以通过这个名字,找到合适的function去执行消费。

当发送一个任务消息在worker确认(acknowledged)前不会消失,一个worker可以提前存储很多消息,如果worker进程崩溃或killed,消息也不会消失, 消息会通过在投递的方式给其他存活worker。

理想的Task函数必须是幂等的,这意味着相同的参数调用多次不会出现不同的结果。但是worker并不知道函数是幂等的, woker默认是提前确认消息, 在执行完成之前这个task永远不会被重复执行。 这个就是上锁(LOCK)意思。这一段和上一段还是有区别的, 这一段强调的是开始执行之前确认

当然确认如果任务是幂等的,你可以设置acks_late选项来控制worker 在函数返回之后去确认消息acknowledge. 请参考: Should I use retry or acks_late?

Basics

你可以很容易的创建任务在任何的可调用函数上使用task()装饰器.

Python

from .models import User

@app.task
def create_user(username, password):
    User.objects.create(username=username, password=password)

可以在装饰器上指定参数, 来设置Task

Python

@app.task(serializer='json')
def create_user(username, password):
    User.objects.create(username=username, password=password)

Multiple decorators

当使用多个装饰器,需要确保任务装饰生效, 把task decorator 写在函数第一个装饰器.

Python

@app.task
@decorator2
@decorator1
def add(x, y):
    return x + y

如何引入task装饰器?

task decorator 存在于你的Celery应用的实例上, 上一节我们已经讲过如何声明Application和使用它.

如果你使用的是Django 或者仍然适用老的版本, 你可能导入task decorator的方式是下面这样.

Python

from celery import task

@task
def add(x, y):
    return x + y

Names

每个任务都有一个唯一的名称, 一个任务创建时如果不提供一个自定义的名字, 将会去生成一个任务.

Python

>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
...     return x + y

>>> add.name
'sum-of-two-numbers'

最好的方式是适用模块名称作为一个名称空间,如果一个任务另外一个模块中也有这样的名称如user模块中有add, group 模块中也有add, 那么这样就会冲突. 最好的方式如下:

Python

>>> @app.task(name='tasks.add')
>>> def add(x, y):
...     return x + y

你可以通过调用task的属性来获取task name.

Python

>>> add.name
'tasks.add'

如果不指定, 默认也会通过模块名和函数名拼装生成name

tasks.py:

Python

@app.task
def add(x, y):
    return x + y

>>> from tasks import add
>>> add.name
'tasks.add'

自动命名和相对import

相对import和自动命名不能一起工作, 所以如果适用相对引入你必须精确的设置name.

如果一个客户端(创建消息的时候) 导入这个myapp.tasks 通过.tasks导入,另外一个worker导入模块通过myapp.tasks, 生成的名称不匹配导致worker会抛出NotRegistered 从而不能工作.

Django INSTALLED_APPS的project.myapp风格.

Python

INSTALLED_APPS = ['project.myapp']

如果你安装app使用project.myapp, 那么task导入的时候也要通过project.myapp.tasks导入, 所以你要确保总是使用相同的名称导入任务.

Python

>>> from project.myapp.tasks import mytask   # << GOOD

>>> from myapp.tasks import mytask    # << BAD!!!

上面第二个例子将导致任务以不同的方式命名, 进而导致客户端和worker不用的任务名称。

Python

>>> from project.myapp.tasks import mytask
>>> mytask.name
'project.myapp.tasks.mytask'

>>> from myapp.tasks import mytask
>>> mytask.name
'myapp.tasks.mytask'

因而你导入必须一致, 这也是python推荐的方式。

同样的你不应该使用旧风格进行相对引入.

Python

from module import foo   # BAD!

from proj.module import foo  # GOOD!

以下新的风格相对引入也是可以推荐的

Python

from .module import foo  # GOOD!

如果你的程序已经做了错的引入, 并且你没有时间去重构, 建议通过显式的指定名称去覆盖自动命名.

Python

@task(name='proj.tasks.add')
def add(x, y):
    return x + y

Context

执行任务request 包含的信息和状态.

request定义了以下属性

key

value

id:

执行任务的唯一ID

group:

组id如果属于组

chord:

The unique id of the chord this task belongs to (if the task is part of the header).

args:

位置参数

kwargs:

键值参数

retries:

重拾次数

is_eager:

如果不是工人是本地客户端,设置为True

eta:

预计任务时间

expires:

任务的过期时

logfile:

worker 的日志文件, See: logging

loglevel:

当前使用的日志级别

hostname:

worker实例的hostname

delivery_info:

额外的传递信息

called_directly:

This flag is set to true if the task was not executed by the worker.

callbacks:

回调函数

errback:

异常回调函数

utc:

如果为True说明调用者启动了utc

3.1 新属性 key|value ---|--- headers:|映射消息头 reply_to:|发送replay到哪个队列 correlation_id:|通常与任务id通用, 常用语amqp的跟踪回复

一个从context获取获取信息的例子:

Python

@app.task(bind=True)
def dump_context(self, x, y):
    print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
            self.request))

Logging

worker 将会自动配置logging, 也可以手动配置定制logging 日志输出.

Celery 提供一个名为celery.task的logger供使用, 你可以通过这个logger 自动的生成一个名称和唯一id作为日志的一部分.

推荐在每个模块中都声明一个logger, 每个模块使用单独的logger.

Python

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y

Celery 使用Python标准库的logging模块, 文档支持可以在logging 模块中看到

你也可以使用print(), 任何写入标准输出和标准错误都会转到日志系统。 所以print的字符也会作为日志记录, 记录等级为WARN.

Retrying

retry() 可以重试任务, 当任务出现可恢复的错误.

当调用retry()时将会发送一个新的消息, 使用相同的task-id, 确保消息和原始任务属于相同的队列.

当一个消息重试后, 任务也会记录一个状态。这样你可以使用结果实例跟踪任务状态记录(see States)

一个使用retry() 的例子:

Python

@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

bind 参数告诉装饰器将会给一个可以访问的self实例.

当存储任务结果,exc 是用于传递异常信息用户日志输出。 内容有异常信息和traceback信息都存在于exc.

如果此任务有max_retries值, 并且重试次数超过了这个值, 那么这个exc异常将会重新raise. 如果是下列情况将不会这样:

  • exc 没有指定

这种情况下将会raise MaxRetriesExceeded异常, 这个是默认异常

  • 没有异常

当重试没有异常发生(也就是上面except没有发生), 重试次数达到了, 但task还没有正确返回, 可以指定给exc一个异常, 用于代理默认的MaxRetriesExceeded.

Python

self.retry(exc=Twitter.LoginError())

将会触发提供的异常信息。

自定义重试间隔

当一个任务要去重试, 可以指定一个时间之后再去重试. 使用default_retry_delay属性来设置默认延迟.默认是三分钟, 注意: 延迟的单位是秒.

你可以使用 retry(..., countdown=60s)来覆盖task级别的default_retry_delay时间. 两种方法灵活使用

Python

@app.task(bind=True, default_retry_delay=30 * 60)  # retry in 30 minutes.
def add(self, x, y):
    try:
        …
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)  # override the default and                                                 # retry in 1 minute

List of Options

Task.name

task 注册名

可以手动设置,也可以生成此name. See: Names

未完待续>>>

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java帮帮-微信公众号-技术文章全总结

Web-第三十三天 MongoDB初级学习

NoSQL(NoSQL = Not Only SQL ),意即"不仅仅是SQL"。

2312
来自专栏向治洪

Android热插拔事件处理详解

一、Android热插拔事件处理流程图 Android热插拔事件处理流程如下图所示: ? 二、组成 1. NetlinkManager:       ...

7787
来自专栏有趣的Python

9- vue django restful framework 打造生鲜超市 -用户登录和手机注册(上)

Vue+Django REST framework实战 搭建一个前后端分离的生鲜超市网站 Django rtf 完成 手机注册和用户登录(上) drf的...

81712
来自专栏测试开发架构之路

JMeter测试工具.jmx文件详解

摘要:了解.jmx文件格式类型,对jmeter二次开发与拓展有很大的帮助,当然也可以利用python对其进行一些处理(生成一些测试用例,对jmx文件进行 ”增删...

2724
来自专栏小李刀刀的专栏

[译]Laravel 5.0 之方法注入

本文译自 Matt Stauffer 的系列文章. ---- Laravel 5.0 中, 容器可以对其解析的方法进行自动分析, 然后根据类型限制把方法所需要的...

3466
来自专栏java架构师

WCF技术剖析_学习笔记之一

本系列适合新手,从0学起。共同学习,共同探讨。 基础概念 SOA:就是采用Web服务的架构 它有一些特性,需要了解: 1、自治的:不依赖于访问它的客户端和其他服...

2896
来自专栏java思维导图

开源项目renren-fast解读,让java不再难懂(二)

1、百度百科的解释: XSS又叫CSS (Cross Site Script) ,跨站脚本攻击。它指的是恶意攻击者往Web页面里插入恶意html代码,当用户浏览...

2524
来自专栏逆向技术

远程线程注入

一丶远程线程注入的讲解 远程线程注入的原理,我会写一个远程线程开发的例子 我们总共需要几步 /*1.查找窗口,获取窗口句柄*/ /*2.根据...

25010
来自专栏JadePeng的技术博客

axios介绍与使用说明 axios中文文档

本周在做一个使用vuejs的前端项目,访问后端服务使用axios库,这里对照官方文档,简单记录下,也方便大家参考。 Axios 是一个基于 Promise 的 ...

2.5K9
来自专栏和蔼的张星的图像处理专栏

1.Win10+VsCode的C/CPP编译环境搭建

我是从开始学C++的时候就一直用的是visual studio,毕竟宇宙第一IDE,写和调试都是超级方便快捷,唯一的缺点可能就是启动慢一点。 之前电脑没有换固...

7836

扫码关注云+社区

领取腾讯云代金券