前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式任务队列系统 Celery 之四

分布式任务队列系统 Celery 之四

作者头像
用户1278550
发布2018-08-09 11:29:14
8970
发布2018-08-09 11:29:14
举报
文章被收录于专栏:idbaidba

一 前言

经过前面几篇文章的介绍,我们了解到Celery的架构,运行机制以及如何调用任务等。在一个复杂的系统中,有不同的任务A,B,C :任务A执行收集几百个实例的元数据,任务B扫描实例慢查询个数,还有任务C检查待执行任务列表。这些任务耗时不同而且需要使用不同的worker去处理。默认情况下Celery会将所有的任务丢到一个队列中去处理。耗时较长的任务A反而会影响其他比较重要的任务比如任务C,导致任务C堆积。此时只用celery默认的队列就不能满足我们的需求了。

解决此问题的方式就是--china(拆哪儿) 也就是本文要讲的Celery的进阶特性 路由(Route)与队列(Queue) 将不同的task路由到不同队列,让不同的worker处理不同种类的task。

二 基础知识 --AMQP

在了解Celery队列之前,我们需要知道 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个异步消息传递所使用的应用层协议规范。AMQP的工作流如下

Exchange有几类type:

1. direct类型: direct是Exchange路由的默认类型,配置该路由规则会把消息路由到那些binding key与routing key完全匹配的Queue中。

2. topic 类型: 该类型与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中。不过topic类型的Exchange在匹配规则上进行了扩展,它引入了两个通配符 #和*前者匹配多个单词(可以为0),后者匹配一个单词。

3. fanout类型: 该类型为广播形式,它不需要指定上面的routing_key之类的东西,只要和该交换绑定的queue,统统发送出去。类似于通过交换口,就广播发出。

4. headers 类型,匹配AMQP 的头而非routing key

想要详细了解 exchange知识的朋友可以移步 https://www.cloudamqp.com/blog/2015-09-03-part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html 本文的图片转自该文。

三 Celery 路由(Route)与队列(Queue)实践

下面我们通过一个完整的例子来了解队列和路由在celery的具体使用。

3.1 核心代码

settings.py

代码语言:javascript
复制
from kombu import Exchange, Queue
BROKER_URL = 'redis://127.0.0.1:6379/3'             # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/3'  # 指定 Backend
CELERY_TIMEZONE = 'Asia/Shanghai'                     # 指定时区,默认是 UTC
# 配置四个队列,并且绑定路由的key,
CELERY_QUEUES = (
    Queue('celery'),#注意Queue('celery') 是系统默认配置的,用于存放CELERY_ROUTES未配置的任务。
    Queue("task_add", Exchange("for_add", type='direct'), routing_key="add"),
    Queue("task_mul", Exchange("for_mul", type='direct'), routing_key="mul"),
    Queue("task_mail", Exchange("for_mail", type='direct'), routing_key="mail")
)
CELERY_ROUTES = {
    'celery_app.task1.add': {"queue": "task_add", "routing_key": "add"},
    'celery_app.task2.mul': {"queue": "task_mul", "routing_key": "mul"},
    'celery_app.task1.sendmail': {"queue": "task_mail", "routing_key": "mail"}

}

CELERY_QUEUES 设置一个指定routing_key的队列,这个名字可以任意指定。

CELERY_ROUTES 设置路由,对指定的任务名,指定对应的队列和routing_key,注意 这里的routing_key需要和上面参数的一致。

task1.py

代码语言:javascript
复制
from celery_app import app
from celery.utils.log import get_logger
logger = get_logger(__name__)
@app.task
def add(x, y):
    logger.info('add {0} + {1} '.format(x, y))
    return x + y
@app.task
def sendmail(msg):
    logger.info('send email to %s' %msg)
    return 'send email to %s' %msg

task2.py

代码语言:javascript
复制
from celery_app import app
from celery.utils.log import get_logger
logger = get_logger(__name__)
@app.task
def mul(x, y):
    logger.info('mul {0} * {1} '.format(x, y))
    return x * y

3.2 启动

如果使用如下方式启动没有指定队列

要启用队列 则需要在 命令行中加上-Q 指定settings.py 中配置的队列。

celery -A celery_app worker -l info -Q task_add,task_mail,task_mul,celery

3.3 调用

In [9]: mul.apply_async(args=[2, 8],routing_key='mul') Out[9]: <AsyncResult: 8c612adb-b184-4dd2-a381-d9a41ecc5a68>

日志输出

[2018-01-01 23:48:44,784: INFO/MainProcess] Received task: celery_app.task2.mul[8c612adb-b184-4dd2-a381-d9a41ecc5a68] [2018-01-01 23:48:44,785: INFO/PoolWorker-3] mul 2 * 8 [2018-01-01 23:48:44,786: INFO/PoolWorker-3] Task celery_app.task2.mul[8c612adb-b184-4dd2-a381-d9a41ecc5a68] succeeded in 0.00100380298682s: 16

In [1]: from celery_app.task1 import add,sendmail In [2]: sendmail.apply_async(('yangyi',), routing_key='mail') Out[2]: <AsyncResult: 7a732457-aef3-4851-96dc-32fd6ca1fc45>

日志输出

[2018-01-01 23:52:05,073: INFO/MainProcess] Received task: celery_app.task1.sendmail[83766431-16c6-4586-a7fd-d7a25b7f33a0] [2018-01-01 23:52:05,075: INFO/PoolWorker-2] send email to yangyi [2018-01-01 23:52:05,076: INFO/PoolWorker-2] Task celery_app.task1.sendmail[83766431-16c6-4586-a7fd-d7a25b7f33a0] succeeded in 0.000912119983695s: 'send email to yangyi'

四 总结

本文介绍了Celery的进阶特性路由和队列,用于解决耗时不同和优先级不同的任务需求。欢迎阅读原文链接查看官方文档 。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-01-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档