前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式任务管理系统 Celery 之二

分布式任务管理系统 Celery 之二

作者头像
用户1278550
发布2018-08-09 10:59:31
8980
发布2018-08-09 10:59:31
举报
文章被收录于专栏:idbaidba

一 前言

前面一篇文章分布式任务管理系统 Celery 之一介绍了分布式任务调度队列Celery的框架以及原理,使用的例子比较简单,对实际的使用场景没有意义。本系列文章会以工程实践为例进行深入学习Celery,了解在具体工程中Celery的配置结构,调用方法,定时任务,任务队列,多机器使用Celery处理任务 。

二 实践

2.1 目录结构

➜ celery_app git:(master) ✗ tree

.

|____celery_app

| |____ __init__.py

| |____ settings.py

| |____ task1.py

| |____ task2.py

|____ client.py

其中

__init__.py 是初始化celery 实例

from celery import Celery app = Celery('demo') # 创建 Celery 实例 app.config_from_object('celery_app.settings') # 通过 Celery 实例加载配置模块

settings.py 是celery的基础配置信息,比如

BROKER_URL = 'redis://127.0.0.1:6379/3' # 指定 Broker CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/3' # 指定 Backend CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,默认是 UTC CELERY_TASK_RESULT_EXPIRES = 18000 # celery任务结果有效期 CELERY_TASK_SERIALIZER = 'json' # 任务序列化结构 CELERY_RESULT_SERIALIZER = 'json' # 结果序列化结构 CELERY_ACCEPT_CONTENT=['json'] # celery接收内容类型['pickle', 'json', 'msgpack', 'yaml'] CELERY_TIMEZONE = 'Asia/Shanghai' # celery使用的时区 CELERY_ENABLE_UTC = True # 启动时区设置 CELERYD_LOG_FILE="/var/log/celery/celery.log" # celery日志存储位置 CELERY_IMPORTS = ( # 指定导入的任务模块 'celery_app.task1', 'celery_app.task2' )

本案例只是使用到基本的配置信息,当然还有很多其他的配置比如定时任务,队列等进阶功能的配置。详细的配置可以围观 http://docs.celeryproject.org/en/3.1/configuration.html

task1.py task2.py 是对worker 业务逻辑的定义,脚本内容如下:

task1.py

import time from celery_app import app from celery.utils.log import get_logger logger = get_logger(__name__) @app.task def add(x, y): time.sleep(5) logger.info('add {0} + {1} '.format(x, y)) return x + y

task2.py

import time from celery_app import app @app.task def multiply(x, y): time.sleep(10) return x * y

client.py

from celery_app.task1 import add from celery_app.task2 import multiply #task1.add.apply_async(args=[2, 8],) # 也可用 task1.add.delay(2, 8) #task2.multiply.apply_async(args=[3, 7], ) # 也可用 task2.multiply.delay(3, 7) add.apply_async(args=[2, 8],) # 也可用 task1.add.delay(2, 8) multiply.apply_async(args=[3, 7],) # 也可用 task2.multiply.delay(3, 7) print 'hello world'

这里要特别注意的是 task的名称 引用方式

如果 使用下面这样类似绝对路径引用 函数的

from celery_app.task1 import add

可以在celery 中直接别识别。

add.apply_async(args=[2, 8],)

如果使用

from celery_app import task1

task1.add.apply_async(args=[2, 8],)

需要在Celery的配置文件导入的任务模块

CELERY_IMPORTS = ( # 指定导入的任务模块

'celery_app.task1',

'celery_app.task2'

)

否则会报错接收了未注册的任务 tasks.add

[2017-12-06 23:33:43,766: ERROR/MainProcess] Received unregistered task of type 'tasks.add'.

The message has been ignored and discarded.

详细的情况请移步 [1]

三 任务--task

tasks 执行任务的程序,完成特定的业务逻辑功能,一般由用户、触发器或其他操作将任务以消息的形式写入队列,然后交由 workers 进行处理。Celery 中要求每个task 具有不同的名称以便worker获取到消息之后能够准确的识别并且被处理。正常情况下任务消息会等待被worker认领,否则会一直存储在broker里面。如果woker 因为异常原因挂了,task 会被其他worker识别并执行。

3.1 调用任务

1 delay 方法

调用Celery的任务的方法可以直接通过 task_name.delay() 还可以使用 task_name.apply_async() 其实就是delay的封装。

task.delay(args1, args2, kwargs=value_1, kwargs2=value_2)

In [1]: from celery_app.task1 import add In [2]: r = add.delay(32,29) In [3]: r.result Out[3]: 61 In [4]: r.status Out[4]: u'SUCCESS' In [5]: r.successful() Out[5]: True In [6]: r.ready() Out[6]: True In [7]: r.wait() Out[7]: 61 In [8]: r.get() Out[8]: 61 delay 返回的是一个 AsyncResult 对象 In [15]: r Out[15]: <AsyncResult: 9719947b-b0be-411d-9b43-48743dab9953>

该对象比较常见的方法和属性

r.ready() # 返回布尔值, 任务执行完成, 返回 True, 否则返回 False.

r.wait() # 等待任务完成, 返回任务执行结果.

r.get() # 获取任务执行结果

r.result # 任务执行结果.

r.state # PENDING, START, SUCCESS

r.status # PENDING, START, SUCCESS

对于 Celery 其内建任务状态有如下几种:

参数 说明

PENDING 任务等待中

STARTED 任务已开始

SUCCESS 任务执行成功

FAILURE 任务执行失败

RETRY 任务将被重试

REVOKED 任务取消

2. apply_async方法

delay 实际上是 apply_async 的别名, 但是 apply_async 支持更多的参数:

task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})

支持的参数 :

countdown : 任务延迟执行的秒数,默认立即执行

add.apply_async((32,29), countdown=5) # 等待5秒之后执行

eta : 任务被执行的绝对时间.

add.apply_async((32,29), eta=now+tiedelta(second=20)) #调用任务之后20秒之后开始执行

expires : 设置超时时间.

add.apply_async((32,29), expires=60)

retry : 定时如果任务失败后, 是否重试.

add.apply_async((32,29), retry=False)

retry_policy : 重试策略.

max_retries : 最大重试次数, 默认为 3 次.

interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待.

interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2

interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 .

3.2 定时任务

Celery 提供类似linux crontab的定时任务功能,需要在配置文件中引入CELERYBEAT_SCHEDULE,Celery的计划任务schedule 可以通过datetime.timedelta 或者 crontab两种方式实现.

在 settings.py 中添加配置:

from celery.schedules import crontab from datetime import timedelta CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'celery_app.task1.add', 'schedule': timedelta(seconds=30), # 每 30 秒执行一次 'args': (5, 8), # 任务函数参数 }, 'sendmail-at-every-20-sec':{ 'task':'celery_app.task1.sendmail', 'schedule':timedelta(seconds=30), 'args': ('yangyi',), }, 'multiply-at-some-time': { 'task': 'celery_app.task1.multiply', 'schedule': crontab(hour=9, minute=50), # 每天早上 9 点 50 分执行一次 'args': (3, 7), # 任务函数参数 } }

这里需要注意的是:如果使用 datetime.timedelta 则需要在配置中加入时区信息,否则默认是以 utc 为准。国内的使用者推荐加上

CELERY_TIMEZONE = 'Asia/Shanghai' # celery使用的时区

task 必须是任务的绝对路径add方法的文件获取路径是 celery_app/task1.add

args 是一个元组,如果参数是一个字符串必须写成('youzan',) 如果写为('youzan') 则会被识别为('y','o','u','z','a','n') 导致报错说给了多个参数。

当然还有一种方式是 使用 celery 提供的periodic_task 作为装饰器,不过使用 periodic_task 不能和 CELERYBEAT_SCHEDULE同时使用。

from celery_app import app from celery.task import periodic_task from celery.utils.log import get_logger from celery.schedules import crontab logger = get_logger(__name__) @app.task def add(x, y): time.sleep(5) logger.info('add {0} + {1} '.format(x, y)) return x + y

# 每分钟调用一次 sendmail 函数

@periodic_task(run_every=(crontab(minute='*/1')), name="add_func", args='yangyi', ignore_result=True) def sendmail(msg): logger.info('start send email to %s' %msg) return 'start send email to %s' %msg

日常维护定时任务

先启动beat 监控进程

celery beat -A celery_app

然后启动 worker:

celery -A celery_app worker -l info

或者与celery app一起启动:

celery -B -A celery_app worker -l info

四 参考文档

[1] http://docs.celeryq.org/en/latest/userguide/tasks.html#task-names

[2] http://docs.celeryproject.org/en/3.1/index.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-12-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档