一 前言
经过前面几篇文章的介绍,我们了解到Celery的架构,运行机制以及如何调用任务等。在一个复杂的系统中,有不同的任务A,B,C :任务A执行收集几百个实例的元数据,任务B扫描实例慢查询个数,还有任务C检查待执行任务列表。这些任务耗时不同而且需要使用不同的worker去处理。默认情况下Celery会将所有的任务丢到一个队列中去处理。耗时较长的任务A反而会影响其他比较重要的任务比如任务C,导致任务C堆积。此时只用celery默认的队列就不能满足我们的需求了。
解决此问题的方式就是--china(拆哪儿) 也就是本文要讲的Celery的进阶特性 路由(Route)与队列(Queue) 将不同的task路由到不同队列,让不同的worker处理不同种类的task。
二 基础知识 --AMQP
在了解Celery队列之前,我们需要知道 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个异步消息传递所使用的应用层协议规范。AMQP的工作流如下
Exchange有几类type:
1. direct类型: direct是Exchange路由的默认类型,配置该路由规则会把消息路由到那些binding key与routing key完全匹配的Queue中。
2. topic 类型: 该类型与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中。不过topic类型的Exchange在匹配规则上进行了扩展,它引入了两个通配符 #和*前者匹配多个单词(可以为0),后者匹配一个单词。
3. fanout类型: 该类型为广播形式,它不需要指定上面的routing_key之类的东西,只要和该交换绑定的queue,统统发送出去。类似于通过交换口,就广播发出。
4. headers 类型,匹配AMQP 的头而非routing key
想要详细了解 exchange知识的朋友可以移步 https://www.cloudamqp.com/blog/2015-09-03-part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html 本文的图片转自该文。
三 Celery 路由(Route)与队列(Queue)实践
下面我们通过一个完整的例子来了解队列和路由在celery的具体使用。
3.1 核心代码
settings.py
from kombu import Exchange, Queue
BROKER_URL = 'redis://127.0.0.1:6379/3' # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/3' # 指定 Backend
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,默认是 UTC
# 配置四个队列,并且绑定路由的key,
CELERY_QUEUES = (
Queue('celery'),#注意Queue('celery') 是系统默认配置的,用于存放CELERY_ROUTES未配置的任务。
Queue("task_add", Exchange("for_add", type='direct'), routing_key="add"),
Queue("task_mul", Exchange("for_mul", type='direct'), routing_key="mul"),
Queue("task_mail", Exchange("for_mail", type='direct'), routing_key="mail")
)
CELERY_ROUTES = {
'celery_app.task1.add': {"queue": "task_add", "routing_key": "add"},
'celery_app.task2.mul': {"queue": "task_mul", "routing_key": "mul"},
'celery_app.task1.sendmail': {"queue": "task_mail", "routing_key": "mail"}
}
CELERY_QUEUES 设置一个指定routing_key的队列,这个名字可以任意指定。
CELERY_ROUTES 设置路由,对指定的任务名,指定对应的队列和routing_key,注意 这里的routing_key需要和上面参数的一致。
task1.py
from celery_app import app
from celery.utils.log import get_logger
logger = get_logger(__name__)
@app.task
def add(x, y):
logger.info('add {0} + {1} '.format(x, y))
return x + y
@app.task
def sendmail(msg):
logger.info('send email to %s' %msg)
return 'send email to %s' %msg
task2.py
from celery_app import app
from celery.utils.log import get_logger
logger = get_logger(__name__)
@app.task
def mul(x, y):
logger.info('mul {0} * {1} '.format(x, y))
return x * y
3.2 启动
如果使用如下方式启动没有指定队列
要启用队列 则需要在 命令行中加上-Q 指定settings.py 中配置的队列。
celery -A celery_app worker -l info -Q task_add,task_mail,task_mul,celery
3.3 调用
In [9]: mul.apply_async(args=[2, 8],routing_key='mul') Out[9]: <AsyncResult: 8c612adb-b184-4dd2-a381-d9a41ecc5a68>
日志输出
[2018-01-01 23:48:44,784: INFO/MainProcess] Received task: celery_app.task2.mul[8c612adb-b184-4dd2-a381-d9a41ecc5a68] [2018-01-01 23:48:44,785: INFO/PoolWorker-3] mul 2 * 8 [2018-01-01 23:48:44,786: INFO/PoolWorker-3] Task celery_app.task2.mul[8c612adb-b184-4dd2-a381-d9a41ecc5a68] succeeded in 0.00100380298682s: 16
In [1]: from celery_app.task1 import add,sendmail In [2]: sendmail.apply_async(('yangyi',), routing_key='mail') Out[2]: <AsyncResult: 7a732457-aef3-4851-96dc-32fd6ca1fc45>
日志输出
[2018-01-01 23:52:05,073: INFO/MainProcess] Received task: celery_app.task1.sendmail[83766431-16c6-4586-a7fd-d7a25b7f33a0] [2018-01-01 23:52:05,075: INFO/PoolWorker-2] send email to yangyi [2018-01-01 23:52:05,076: INFO/PoolWorker-2] Task celery_app.task1.sendmail[83766431-16c6-4586-a7fd-d7a25b7f33a0] succeeded in 0.000912119983695s: 'send email to yangyi'
四 总结
本文介绍了Celery的进阶特性路由和队列,用于解决耗时不同和优先级不同的任务需求。欢迎阅读原文链接查看官方文档 。