专栏首页idba分布式任务管理系统 Celery 之二

分布式任务管理系统 Celery 之二

一 前言

前面一篇文章分布式任务管理系统 Celery 之一介绍了分布式任务调度队列Celery的框架以及原理,使用的例子比较简单,对实际的使用场景没有意义。本系列文章会以工程实践为例进行深入学习Celery,了解在具体工程中Celery的配置结构,调用方法,定时任务,任务队列,多机器使用Celery处理任务 。

二 实践

2.1 目录结构

➜ celery_app git:(master) ✗ tree

.

|____celery_app

| |____ __init__.py

| |____ settings.py

| |____ task1.py

| |____ task2.py

|____ client.py

其中

__init__.py 是初始化celery 实例

from celery import Celery app = Celery('demo') # 创建 Celery 实例 app.config_from_object('celery_app.settings') # 通过 Celery 实例加载配置模块

settings.py 是celery的基础配置信息,比如

BROKER_URL = 'redis://127.0.0.1:6379/3' # 指定 Broker CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/3' # 指定 Backend CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,默认是 UTC CELERY_TASK_RESULT_EXPIRES = 18000 # celery任务结果有效期 CELERY_TASK_SERIALIZER = 'json' # 任务序列化结构 CELERY_RESULT_SERIALIZER = 'json' # 结果序列化结构 CELERY_ACCEPT_CONTENT=['json'] # celery接收内容类型['pickle', 'json', 'msgpack', 'yaml'] CELERY_TIMEZONE = 'Asia/Shanghai' # celery使用的时区 CELERY_ENABLE_UTC = True # 启动时区设置 CELERYD_LOG_FILE="/var/log/celery/celery.log" # celery日志存储位置 CELERY_IMPORTS = ( # 指定导入的任务模块 'celery_app.task1', 'celery_app.task2' )

本案例只是使用到基本的配置信息,当然还有很多其他的配置比如定时任务,队列等进阶功能的配置。详细的配置可以围观 http://docs.celeryproject.org/en/3.1/configuration.html

task1.py task2.py 是对worker 业务逻辑的定义,脚本内容如下:

task1.py

import time from celery_app import app from celery.utils.log import get_logger logger = get_logger(__name__) @app.task def add(x, y): time.sleep(5) logger.info('add {0} + {1} '.format(x, y)) return x + y

task2.py

import time from celery_app import app @app.task def multiply(x, y): time.sleep(10) return x * y

client.py

from celery_app.task1 import add from celery_app.task2 import multiply #task1.add.apply_async(args=[2, 8],) # 也可用 task1.add.delay(2, 8) #task2.multiply.apply_async(args=[3, 7], ) # 也可用 task2.multiply.delay(3, 7) add.apply_async(args=[2, 8],) # 也可用 task1.add.delay(2, 8) multiply.apply_async(args=[3, 7],) # 也可用 task2.multiply.delay(3, 7) print 'hello world'

这里要特别注意的是 task的名称 引用方式

如果 使用下面这样类似绝对路径引用 函数的

from celery_app.task1 import add

可以在celery 中直接别识别。

add.apply_async(args=[2, 8],)

如果使用

from celery_app import task1

task1.add.apply_async(args=[2, 8],)

需要在Celery的配置文件导入的任务模块

CELERY_IMPORTS = ( # 指定导入的任务模块

'celery_app.task1',

'celery_app.task2'

)

否则会报错接收了未注册的任务 tasks.add

[2017-12-06 23:33:43,766: ERROR/MainProcess] Received unregistered task of type 'tasks.add'.

The message has been ignored and discarded.

详细的情况请移步 [1]

三 任务--task

tasks 执行任务的程序,完成特定的业务逻辑功能,一般由用户、触发器或其他操作将任务以消息的形式写入队列,然后交由 workers 进行处理。Celery 中要求每个task 具有不同的名称以便worker获取到消息之后能够准确的识别并且被处理。正常情况下任务消息会等待被worker认领,否则会一直存储在broker里面。如果woker 因为异常原因挂了,task 会被其他worker识别并执行。

3.1 调用任务

1 delay 方法

调用Celery的任务的方法可以直接通过 task_name.delay() 还可以使用 task_name.apply_async() 其实就是delay的封装。

task.delay(args1, args2, kwargs=value_1, kwargs2=value_2)

In [1]: from celery_app.task1 import add In [2]: r = add.delay(32,29) In [3]: r.result Out[3]: 61 In [4]: r.status Out[4]: u'SUCCESS' In [5]: r.successful() Out[5]: True In [6]: r.ready() Out[6]: True In [7]: r.wait() Out[7]: 61 In [8]: r.get() Out[8]: 61 delay 返回的是一个 AsyncResult 对象 In [15]: r Out[15]: <AsyncResult: 9719947b-b0be-411d-9b43-48743dab9953>

该对象比较常见的方法和属性

r.ready() # 返回布尔值, 任务执行完成, 返回 True, 否则返回 False.

r.wait() # 等待任务完成, 返回任务执行结果.

r.get() # 获取任务执行结果

r.result # 任务执行结果.

r.state # PENDING, START, SUCCESS

r.status # PENDING, START, SUCCESS

对于 Celery 其内建任务状态有如下几种:

参数 说明

PENDING 任务等待中

STARTED 任务已开始

SUCCESS 任务执行成功

FAILURE 任务执行失败

RETRY 任务将被重试

REVOKED 任务取消

2. apply_async方法

delay 实际上是 apply_async 的别名, 但是 apply_async 支持更多的参数:

task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})

支持的参数 :

countdown : 任务延迟执行的秒数,默认立即执行

add.apply_async((32,29), countdown=5) # 等待5秒之后执行

eta : 任务被执行的绝对时间.

add.apply_async((32,29), eta=now+tiedelta(second=20)) #调用任务之后20秒之后开始执行

expires : 设置超时时间.

add.apply_async((32,29), expires=60)

retry : 定时如果任务失败后, 是否重试.

add.apply_async((32,29), retry=False)

retry_policy : 重试策略.

max_retries : 最大重试次数, 默认为 3 次.

interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待.

interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2

interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 .

3.2 定时任务

Celery 提供类似linux crontab的定时任务功能,需要在配置文件中引入CELERYBEAT_SCHEDULE,Celery的计划任务schedule 可以通过datetime.timedelta 或者 crontab两种方式实现.

在 settings.py 中添加配置:

from celery.schedules import crontab from datetime import timedelta CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'celery_app.task1.add', 'schedule': timedelta(seconds=30), # 每 30 秒执行一次 'args': (5, 8), # 任务函数参数 }, 'sendmail-at-every-20-sec':{ 'task':'celery_app.task1.sendmail', 'schedule':timedelta(seconds=30), 'args': ('yangyi',), }, 'multiply-at-some-time': { 'task': 'celery_app.task1.multiply', 'schedule': crontab(hour=9, minute=50), # 每天早上 9 点 50 分执行一次 'args': (3, 7), # 任务函数参数 } }

这里需要注意的是:如果使用 datetime.timedelta 则需要在配置中加入时区信息,否则默认是以 utc 为准。国内的使用者推荐加上

CELERY_TIMEZONE = 'Asia/Shanghai' # celery使用的时区

task 必须是任务的绝对路径add方法的文件获取路径是 celery_app/task1.add

args 是一个元组,如果参数是一个字符串必须写成('youzan',) 如果写为('youzan') 则会被识别为('y','o','u','z','a','n') 导致报错说给了多个参数。

当然还有一种方式是 使用 celery 提供的periodic_task 作为装饰器,不过使用 periodic_task 不能和 CELERYBEAT_SCHEDULE同时使用。

from celery_app import app from celery.task import periodic_task from celery.utils.log import get_logger from celery.schedules import crontab logger = get_logger(__name__) @app.task def add(x, y): time.sleep(5) logger.info('add {0} + {1} '.format(x, y)) return x + y

# 每分钟调用一次 sendmail 函数

@periodic_task(run_every=(crontab(minute='*/1')), name="add_func", args='yangyi', ignore_result=True) def sendmail(msg): logger.info('start send email to %s' %msg) return 'start send email to %s' %msg

日常维护定时任务

先启动beat 监控进程

celery beat -A celery_app

然后启动 worker:

celery -A celery_app worker -l info

或者与celery app一起启动:

celery -B -A celery_app worker -l info

四 参考文档

[1] http://docs.celeryq.org/en/latest/userguide/tasks.html#task-names

[2] http://docs.celeryproject.org/en/3.1/index.html

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-12-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Redis 6.0 新特性概览

    Redis 6 RC2 于今年3月5号Release,预计今年4.30月份发布GA版本,官方网站提供 unstable 版本的供大家测试,本文基于官方文档介绍R...

    用户1278550
  • 分布式任务管理系统 Celery 之三

    前面一篇文章 分布式任务管理系统 Celery 之二 以工程实践为例进行深入学习Celery,介绍工程中Celery的配置结构,调用方法,定时任务相关知...

    用户1278550
  • 分布式任务队列系统 Celery 之四

    经过前面几篇文章的介绍,我们了解到Celery的架构,运行机制以及如何调用任务等。在一个复杂的系统中,有不同的任务A,B,C :任务A执行收集几百个实例的元...

    用户1278550
  • 用Eclipse连接正在运行的Tomcat进行Debug的步骤

    在弹出的窗口中左侧双击“Remote Java Application”,然后再右侧显示的界面中选择设置“Project”、“Host”和“Port”(888...

    LeoXu
  • Qt官方示例-使用布局

      通常,子窗口小部件使用布局对象而不是通过显式指定位置和大小来安排在窗口内。在这里,我们构造了一个QLabel和QLineEdit控件并使用QHBoxLayo...

    Qt君
  • jQuery 的“原型污染”安全漏洞

    前两周发布的 jQuery 3.4.0 除了常规更新外,更重要的是修复了一个称为“原型污染(prototype pollution)”的罕见安全漏洞。

    周俊辉
  • 使用JPA中@Query 注解实现update 操作

    spring使用jpa进行update操作主要有两种方式: 1、调用保存实体的方法 1)保存一个实体:repository.save(T entity) 2)保...

    hbbliyong
  • Linux下通过受限bash创建指定权限的账号

    在日常业务运维中,有时为了配合解决问题,需要给非运维人员开通系统账号,用于查询日志或代码。通常为了系统安全或避免不必要的误操作等目的,会将账号权限降至最低。下面...

    洗尽了浮华
  • Nodejs学习笔记(十六)--- Pomelo介绍&入门

    前言&介绍 Pomelo:一个快速、可扩展、Node.js分布式游戏服务器框架 从三四年前接触Node.js开始就接触到了Pomelo,从Pomelo最...

    Porschev
  • SAS X Command Execute Python Code

    小编最近在潜心研究外部数据导入SAS,深感Excel的导入的不便利,想实现程序控制将Excel改为CSV在通过CSV导入SAS。想着想着,就想到用外部语言来实现...

    Setup

扫码关注云+社区

领取腾讯云代金券