前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Celery中文翻译-Application

Celery中文翻译-Application

作者头像
枇杷李子橙橘柚
发布2019-05-26 11:05:06
8230
发布2019-05-26 11:05:06
举报
文章被收录于专栏:没有擅长的YcY

Celery在使用前必须实例化,称为application或app。app是线程安全的,具有不同配置、组件、task的多个Celery应用可以在同一个进程空间共存。

代码语言:javascript
复制
# 创建Celery应用
>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>

最后一行文本化显示了Celery应用:包含应用所属类的名称,当前主模块名,以及内存地址。唯一重要的信息是模块名称。

Main Name

在Celery中发送task消息时,该消息仅包含要执行的task的名称。每一个worker维护一个task名称和对应函数的映射,这称为task registry

当定义一个task时,该task将注册到本地:

代码语言:javascript
复制
>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks['__main__.add']
<@task: __main__.add>

当Celery无法检测task函数属于哪个模块时,使用main模块名生成初始task名称。

这种方式仅适用于以下两种场景:

  1. 定义task的模块作为程序运行
  2. app在python shell中创建
代码语言:javascript
复制
# tasks.py
from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == '__main__':
    app.worker_main()

如果直接运行tasks.py,task名将以__main__为前缀,但如果tasks.py被其他程序导入,task名将以tasks为前缀。如下:

代码语言:javascript
复制
>>> from tasks import add
>>> add.name
tasks.add

也可以直接指定主模块名:

代码语言:javascript
复制
>>> app = Celery('tasks')
>>> app.main
'tasks'

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.name
tasks.add

Configuration

可以通过直接设置,或使用专用配置模块对Celery进行配置。

通过app.conf属性查看或直接设置配置:

代码语言:javascript
复制
>>> app.conf.timezone
'Europe/London'

>>> app.conf.enable_utc = True

或用app.conf.update方法一次更新多个配置:

代码语言:javascript
复制
>>> app.conf.update(
...     enable_utc=True,
...     timezone='Europe/London',
...)

config_from_object

app.config_from_object()方法从配置模块或对象中导入配置。需要注意的是:调用config_from_object()方法将重置在这之前配置的任何设置

使用模块名 app.config_from_object()方法接收python模块的完全限定名(fully qualified name)或具体到其中的某个属性名,例如"celeryconfig", "myproj.config.celery", 或"myproj.config:CeleryConfig":

代码语言:javascript
复制
from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

只要能够正常执行import celeryconfig,app就能正常配置。

使用模块对象 也可以传入一个已导入的模块对象,但不建议这样做。

代码语言:javascript
复制
import celeryconfig

from celery import Celery

app = Celery()
app.config_from_object(celeryconfig)

更推荐使用模块名的方式,因为这样在使用prefork pool时不需要序列化该模块。如果在实际应用中出现配置问题或序列化错误,请尝试使用模块名的方式。

使用配置类或对象

代码语言:javascript
复制
from celery import Celery

app = Celery()

class Config:
    enable_utc = True
    timezone = 'Europe/London'

app.config_from_object(Config)

config_from_envvar

app.config_from_envvar()方法从环境变量中接收配置模块名。

代码语言:javascript
复制
import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

通过环境变量指定配置模块:

代码语言:javascript
复制
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info

Censored configuration

如果要显示Celery配置,可能需要过滤某些敏感信息如密码、密钥等。Celery提供了几种用于帮助显示配置的实用方法。

humanize() 该方法返回列表字符串形式的配置,默认只包含改动过的配置,如果要显示内置的默认配置,设置with_defaults参数为True:

代码语言:javascript
复制
>>> app.conf.humanize(with_defaults=False, censored=True)

table() 该方法返回字典形式的配置:

代码语言:javascript
复制
>>> app.conf.table(with_defaults=False, censored=True)

Celery可能不会移除所有的敏感信息,因为它使用正则表达式匹配键并判断是否移除。如果用户添加了包含敏感信息的自定义配置,可以使用Celery可能标记为敏感配置的名称来命名(API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE)。

Laziness

应用实例是惰性的。

创建Celery实例只会执行以下操作:

  1. 创建用于event的logical clock instance
  2. 创建task registry
  3. 设置为当前应用(除非禁用了set_as_current参数)
  4. 调用app.on_init()回调函数(默认不执行任何操作)

app.task()装饰器不会在task定义时立即创建task,而是在task使用时或finalized应用后创建。

下例说明了在使用task或访问其属性前,都不会创建task:

代码语言:javascript
复制
>>> @app.task
>>> def add(x, y):
...    return x + y

>>> type(add)
<class 'celery.local.PromiseProxy'>

>>> add.__evaluated__()
False

>>> add        # <-- causes repr(add) to happen
<@task: __main__.add>

>>> add.__evaluated__()
True

应用的Finalization指显式地调用app.finalize()方法或隐式地访问app.tasks属性。

finalized应用将会:

  1. 复制必须在应用间共享的task。task默认是共享的,但如果禁用了task装饰器的shared属性,将属于应用私有。
  2. 评估所有待处理的task装饰器
  3. 确保所有task绑定到当前应用。将task绑定到某个应用,以便可以从配置中读取默认值。

Breaking the chain

虽然可以依赖于当前应用,但最佳实践是将应用实例传递给任何需要它的对象,这个行为可以称为app chain

代码语言:javascript
复制
# 依赖于当前应用(bad)

from celery import current_app

class Scheduler(object):

    def run(self):
        app = current_app
代码语言:javascript
复制
# 传递应用实例(good)

class Scheduler(object):

    def __init__(self, app):
        self.app = app

在开发模式设置CELERY_TRACE_APP环境变量,可以在应用链断开时抛出异常:

代码语言:javascript
复制
$ CELERY_TRACE_APP=1 celery worker -l info

Abstract Tasks

使用task()装饰器创建的task都继承自celery.app.task模块的Task基类。继承该类可以自定义task类:

代码语言:javascript
复制
from celery import Task
# 或者 from celery.app.task import Task

class DebugTask(Task):

    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return super(DebugTask, self).__call__(*args, **kwargs)

如果要重写__call__()方法,记得调用super。这样在task直接调用时会执行基类的默认事件。

Task基类是特殊的,因为它并未绑定到任何特定的应用。一旦task绑定到应用,它将读取配置以设置默认值等。

  1. 通过base参数指定基类 @app.task(base=DebugTask) def add(x, y): return x + y
  2. 通过app.Task属性指定基类 >>> from celery import Celery, Task >>> app = Celery() >>> class MyBaseTask(Task): ... queue = 'hipri' >>> app.Task = MyBaseTask >>> app.Task <unbound MyBaseTask> >>> @app.task ... def add(x, y): ... return x + y >>> add <@task: __main__.add> >>> add.__class__.mro() [<class add of <Celery __main__:0x1012b4410>>, <unbound MyBaseTask>, <unbound Task>, <type 'object'>]
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Main Name
  • Configuration
    • config_from_object
      • config_from_envvar
      • Censored configuration
      • Laziness
      • Breaking the chain
      • Abstract Tasks
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档