前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Celery用户手册 - Tasks

Celery用户手册 - Tasks

作者头像
用户1416054
发布2018-08-02 12:02:44
1.5K0
发布2018-08-02 12:02:44
举报
文章被收录于专栏:JackeyGao的博客JackeyGao的博客

Celery用户手册 - Tasks

Posted April 19, 2016

Tasks是Celery 应用的构建块。事实上Celery应用是由一个或多个Task拼装组成的。

一个Task即是一个对象, Task被创建后可以被所有调用, 它是双重角色, 当Task被调用可以通过Task可以发送消息, 同时当作为一个worker的时候可以接收消息,并消费。

每个Task name 都是唯一的, 这样可以通过这个名字,找到合适的function去执行消费。

当发送一个任务消息在worker确认(acknowledged)前不会消失,一个worker可以提前存储很多消息,如果worker进程崩溃或killed,消息也不会消失, 消息会通过在投递的方式给其他存活worker。

理想的Task函数必须是幂等的,这意味着相同的参数调用多次不会出现不同的结果。但是worker并不知道函数是幂等的, woker默认是提前确认消息, 在执行完成之前这个task永远不会被重复执行。 这个就是上锁(LOCK)意思。这一段和上一段还是有区别的, 这一段强调的是开始执行之前确认

当然确认如果任务是幂等的,你可以设置acks_late选项来控制worker 在函数返回之后去确认消息acknowledge. 请参考: Should I use retry or acks_late?

Basics

你可以很容易的创建任务在任何的可调用函数上使用task()装饰器.

Python

代码语言:javascript
复制
from .models import User

@app.task
def create_user(username, password):
    User.objects.create(username=username, password=password)

可以在装饰器上指定参数, 来设置Task

Python

代码语言:javascript
复制
@app.task(serializer='json')
def create_user(username, password):
    User.objects.create(username=username, password=password)

Multiple decorators

当使用多个装饰器,需要确保任务装饰生效, 把task decorator 写在函数第一个装饰器.

Python

代码语言:javascript
复制
@app.task
@decorator2
@decorator1
def add(x, y):
    return x + y

如何引入task装饰器?

task decorator 存在于你的Celery应用的实例上, 上一节我们已经讲过如何声明Application和使用它.

如果你使用的是Django 或者仍然适用老的版本, 你可能导入task decorator的方式是下面这样.

Python

代码语言:javascript
复制
from celery import task

@task
def add(x, y):
    return x + y

Names

每个任务都有一个唯一的名称, 一个任务创建时如果不提供一个自定义的名字, 将会去生成一个任务.

Python

代码语言:javascript
复制
>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
...     return x + y

>>> add.name
'sum-of-two-numbers'

最好的方式是适用模块名称作为一个名称空间,如果一个任务另外一个模块中也有这样的名称如user模块中有add, group 模块中也有add, 那么这样就会冲突. 最好的方式如下:

Python

代码语言:javascript
复制
>>> @app.task(name='tasks.add')
>>> def add(x, y):
...     return x + y

你可以通过调用task的属性来获取task name.

Python

代码语言:javascript
复制
>>> add.name
'tasks.add'

如果不指定, 默认也会通过模块名和函数名拼装生成name

tasks.py:

Python

代码语言:javascript
复制
@app.task
def add(x, y):
    return x + y

>>> from tasks import add
>>> add.name
'tasks.add'

自动命名和相对import

相对import和自动命名不能一起工作, 所以如果适用相对引入你必须精确的设置name.

如果一个客户端(创建消息的时候) 导入这个myapp.tasks 通过.tasks导入,另外一个worker导入模块通过myapp.tasks, 生成的名称不匹配导致worker会抛出NotRegistered 从而不能工作.

Django INSTALLED_APPS的project.myapp风格.

Python

代码语言:javascript
复制
INSTALLED_APPS = ['project.myapp']

如果你安装app使用project.myapp, 那么task导入的时候也要通过project.myapp.tasks导入, 所以你要确保总是使用相同的名称导入任务.

Python

代码语言:javascript
复制
>>> from project.myapp.tasks import mytask   # << GOOD

>>> from myapp.tasks import mytask    # << BAD!!!

上面第二个例子将导致任务以不同的方式命名, 进而导致客户端和worker不用的任务名称。

Python

代码语言:javascript
复制
>>> from project.myapp.tasks import mytask
>>> mytask.name
'project.myapp.tasks.mytask'

>>> from myapp.tasks import mytask
>>> mytask.name
'myapp.tasks.mytask'

因而你导入必须一致, 这也是python推荐的方式。

同样的你不应该使用旧风格进行相对引入.

Python

代码语言:javascript
复制
from module import foo   # BAD!

from proj.module import foo  # GOOD!

以下新的风格相对引入也是可以推荐的

Python

代码语言:javascript
复制
from .module import foo  # GOOD!

如果你的程序已经做了错的引入, 并且你没有时间去重构, 建议通过显式的指定名称去覆盖自动命名.

Python

代码语言:javascript
复制
@task(name='proj.tasks.add')
def add(x, y):
    return x + y

Context

执行任务request 包含的信息和状态.

request定义了以下属性

key

value

id:

执行任务的唯一ID

group:

组id如果属于组

chord:

The unique id of the chord this task belongs to (if the task is part of the header).

args:

位置参数

kwargs:

键值参数

retries:

重拾次数

is_eager:

如果不是工人是本地客户端,设置为True

eta:

预计任务时间

expires:

任务的过期时

logfile:

worker 的日志文件, See: logging

loglevel:

当前使用的日志级别

hostname:

worker实例的hostname

delivery_info:

额外的传递信息

called_directly:

This flag is set to true if the task was not executed by the worker.

callbacks:

回调函数

errback:

异常回调函数

utc:

如果为True说明调用者启动了utc

3.1 新属性 key|value ---|--- headers:|映射消息头 reply_to:|发送replay到哪个队列 correlation_id:|通常与任务id通用, 常用语amqp的跟踪回复

一个从context获取获取信息的例子:

Python

代码语言:javascript
复制
@app.task(bind=True)
def dump_context(self, x, y):
    print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
            self.request))

Logging

worker 将会自动配置logging, 也可以手动配置定制logging 日志输出.

Celery 提供一个名为celery.task的logger供使用, 你可以通过这个logger 自动的生成一个名称和唯一id作为日志的一部分.

推荐在每个模块中都声明一个logger, 每个模块使用单独的logger.

Python

代码语言:javascript
复制
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y

Celery 使用Python标准库的logging模块, 文档支持可以在logging 模块中看到

你也可以使用print(), 任何写入标准输出和标准错误都会转到日志系统。 所以print的字符也会作为日志记录, 记录等级为WARN.

Retrying

retry() 可以重试任务, 当任务出现可恢复的错误.

当调用retry()时将会发送一个新的消息, 使用相同的task-id, 确保消息和原始任务属于相同的队列.

当一个消息重试后, 任务也会记录一个状态。这样你可以使用结果实例跟踪任务状态记录(see States)

一个使用retry() 的例子:

Python

代码语言:javascript
复制
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

bind 参数告诉装饰器将会给一个可以访问的self实例.

当存储任务结果,exc 是用于传递异常信息用户日志输出。 内容有异常信息和traceback信息都存在于exc.

如果此任务有max_retries值, 并且重试次数超过了这个值, 那么这个exc异常将会重新raise. 如果是下列情况将不会这样:

  • exc 没有指定

这种情况下将会raise MaxRetriesExceeded异常, 这个是默认异常

  • 没有异常

当重试没有异常发生(也就是上面except没有发生), 重试次数达到了, 但task还没有正确返回, 可以指定给exc一个异常, 用于代理默认的MaxRetriesExceeded.

Python

代码语言:javascript
复制
self.retry(exc=Twitter.LoginError())

将会触发提供的异常信息。

自定义重试间隔

当一个任务要去重试, 可以指定一个时间之后再去重试. 使用default_retry_delay属性来设置默认延迟.默认是三分钟, 注意: 延迟的单位是秒.

你可以使用 retry(..., countdown=60s)来覆盖task级别的default_retry_delay时间. 两种方法灵活使用

Python

代码语言:javascript
复制
@app.task(bind=True, default_retry_delay=30 * 60)  # retry in 30 minutes.
def add(self, x, y):
    try:
        …
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)  # override the default and                                                 # retry in 1 minute

List of Options

Task.name

task 注册名

可以手动设置,也可以生成此name. See: Names

未完待续>>>

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Celery用户手册 - Tasks
    • Basics
      • Names
        • Context
          • Logging
            • Retrying
              • List of Options
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档