专栏首页idba分布式任务队列系统 Celery 之四

分布式任务队列系统 Celery 之四

一 前言

经过前面几篇文章的介绍,我们了解到Celery的架构,运行机制以及如何调用任务等。在一个复杂的系统中,有不同的任务A,B,C :任务A执行收集几百个实例的元数据,任务B扫描实例慢查询个数,还有任务C检查待执行任务列表。这些任务耗时不同而且需要使用不同的worker去处理。默认情况下Celery会将所有的任务丢到一个队列中去处理。耗时较长的任务A反而会影响其他比较重要的任务比如任务C,导致任务C堆积。此时只用celery默认的队列就不能满足我们的需求了。

解决此问题的方式就是--china(拆哪儿) 也就是本文要讲的Celery的进阶特性 路由(Route)与队列(Queue) 将不同的task路由到不同队列,让不同的worker处理不同种类的task。

二 基础知识 --AMQP

在了解Celery队列之前,我们需要知道 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个异步消息传递所使用的应用层协议规范。AMQP的工作流如下

Exchange有几类type:

1. direct类型: direct是Exchange路由的默认类型,配置该路由规则会把消息路由到那些binding key与routing key完全匹配的Queue中。

2. topic 类型: 该类型与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中。不过topic类型的Exchange在匹配规则上进行了扩展,它引入了两个通配符 #和*前者匹配多个单词(可以为0),后者匹配一个单词。

3. fanout类型: 该类型为广播形式,它不需要指定上面的routing_key之类的东西,只要和该交换绑定的queue,统统发送出去。类似于通过交换口,就广播发出。

4. headers 类型,匹配AMQP 的头而非routing key

想要详细了解 exchange知识的朋友可以移步 https://www.cloudamqp.com/blog/2015-09-03-part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html 本文的图片转自该文。

三 Celery 路由(Route)与队列(Queue)实践

下面我们通过一个完整的例子来了解队列和路由在celery的具体使用。

3.1 核心代码

settings.py

from kombu import Exchange, Queue
BROKER_URL = 'redis://127.0.0.1:6379/3'             # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/3'  # 指定 Backend
CELERY_TIMEZONE = 'Asia/Shanghai'                     # 指定时区,默认是 UTC
# 配置四个队列,并且绑定路由的key,
CELERY_QUEUES = (
    Queue('celery'),#注意Queue('celery') 是系统默认配置的,用于存放CELERY_ROUTES未配置的任务。
    Queue("task_add", Exchange("for_add", type='direct'), routing_key="add"),
    Queue("task_mul", Exchange("for_mul", type='direct'), routing_key="mul"),
    Queue("task_mail", Exchange("for_mail", type='direct'), routing_key="mail")
)
CELERY_ROUTES = {
    'celery_app.task1.add': {"queue": "task_add", "routing_key": "add"},
    'celery_app.task2.mul': {"queue": "task_mul", "routing_key": "mul"},
    'celery_app.task1.sendmail': {"queue": "task_mail", "routing_key": "mail"}

}

CELERY_QUEUES 设置一个指定routing_key的队列,这个名字可以任意指定。

CELERY_ROUTES 设置路由,对指定的任务名,指定对应的队列和routing_key,注意 这里的routing_key需要和上面参数的一致。

task1.py

from celery_app import app
from celery.utils.log import get_logger
logger = get_logger(__name__)
@app.task
def add(x, y):
    logger.info('add {0} + {1} '.format(x, y))
    return x + y
@app.task
def sendmail(msg):
    logger.info('send email to %s' %msg)
    return 'send email to %s' %msg

task2.py

from celery_app import app
from celery.utils.log import get_logger
logger = get_logger(__name__)
@app.task
def mul(x, y):
    logger.info('mul {0} * {1} '.format(x, y))
    return x * y

3.2 启动

如果使用如下方式启动没有指定队列

要启用队列 则需要在 命令行中加上-Q 指定settings.py 中配置的队列。

celery -A celery_app worker -l info -Q task_add,task_mail,task_mul,celery

3.3 调用

In [9]: mul.apply_async(args=[2, 8],routing_key='mul') Out[9]: <AsyncResult: 8c612adb-b184-4dd2-a381-d9a41ecc5a68>

日志输出

[2018-01-01 23:48:44,784: INFO/MainProcess] Received task: celery_app.task2.mul[8c612adb-b184-4dd2-a381-d9a41ecc5a68] [2018-01-01 23:48:44,785: INFO/PoolWorker-3] mul 2 * 8 [2018-01-01 23:48:44,786: INFO/PoolWorker-3] Task celery_app.task2.mul[8c612adb-b184-4dd2-a381-d9a41ecc5a68] succeeded in 0.00100380298682s: 16

In [1]: from celery_app.task1 import add,sendmail In [2]: sendmail.apply_async(('yangyi',), routing_key='mail') Out[2]: <AsyncResult: 7a732457-aef3-4851-96dc-32fd6ca1fc45>

日志输出

[2018-01-01 23:52:05,073: INFO/MainProcess] Received task: celery_app.task1.sendmail[83766431-16c6-4586-a7fd-d7a25b7f33a0] [2018-01-01 23:52:05,075: INFO/PoolWorker-2] send email to yangyi [2018-01-01 23:52:05,076: INFO/PoolWorker-2] Task celery_app.task1.sendmail[83766431-16c6-4586-a7fd-d7a25b7f33a0] succeeded in 0.000912119983695s: 'send email to yangyi'

四 总结

本文介绍了Celery的进阶特性路由和队列,用于解决耗时不同和优先级不同的任务需求。欢迎阅读原文链接查看官方文档 。

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-01-03

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 分布式任务管理系统 Celery 之二

    前面一篇文章分布式任务管理系统 Celery 之一介绍了分布式任务调度队列Celery的框架以及原理,使用的例子比较简单,对实际的使用场景没有意义。本系列文章...

    用户1278550
  • Redis 6.0 新特性概览

    Redis 6 RC2 于今年3月5号Release,预计今年4.30月份发布GA版本,官方网站提供 unstable 版本的供大家测试,本文基于官方文档介绍R...

    用户1278550
  • Redis 删除1.2亿指定前缀的key

    因为更换IDC的原因,我们需要迁移缓存到新的机房,开发同学提出老的缓存有1.2亿无效(未设置过期时间)的key和正常在用的业务key,在迁移之前可以先指定前缀将...

    用户1278550
  • python celery 模块

    Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度

    py3study
  • 深度| 解密面部特征点检测的关键技术

    面部特征点定位任务即根据输入的人脸图像,自动定位出面部关键特征点,如眼睛、鼻尖、嘴角点、眉毛以及人脸各部件轮廓点等,如下图所示。 ? 这项技术的应用很广泛,比如...

    AI科技评论
  • 面部特征点定位概述及最近研究进展

    面部特征点定位任务即根据输入的人脸图像,自动定位出面部关键特征点,如眼睛、鼻尖、嘴角点、眉毛以及人脸各部件轮廓点等,如下图所示。 ? 这项技术...

    智能算法
  • 一枝独秀,报告显示SAP依然引领一线ERP市场

    长久以来,SAP、Oracle等公司一直就是整个ERP的代名词,但随着云计算与云ERP的兴起,人们普遍看好后者会超越前者,甚至连Salesforce和Workd...

    人称T客
  • 主从分布式爬虫

    最常用的一种就是主从分布式爬虫,本文将使用Redis服务器来作为任务队列。 如图:

    py3study
  • wpscan扫描工具

    WPScan是一个扫描WordPress漏洞的黑盒子扫描器,可以扫描出wordpress的版本,主题,插件,后台用户以及爆破后台用户密码等,Kali Linux...

    轩辕小子
  • OpenAI“大力出奇迹”的GPT2同样适用图像领域,训练性能良好

    GPT-3的热度还在发酵,OpenAI又放了个大招。这次的研究往图像界迈出了新的一步。

    大数据文摘

扫码关注云+社区

领取腾讯云代金券