前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Celery多个定时任务使用RabbitMQ,Queue冲突解决

Celery多个定时任务使用RabbitMQ,Queue冲突解决

作者头像
Python碎片公众号
发布2021-02-26 14:14:28
1.1K0
发布2021-02-26 14:14:28
举报
文章被收录于专栏:Python碎片公众号的专栏

一. 场景描述

1.使用celery实现定时任务后,任务会被定时添加到后端指定的队列里,队列可以是RabbitMQ,也可以是redis.

2.在创建Celery对象app的时候,指定了使用rabbitmq作为后端代理broker, celery会将定时任务异步添加到mq队列中,worker从队列中获取任务.

3.如果已经运行了一个celery定时任务A,定时任务A使用mq,此时要新增另一个celery定时任务B,定时任务B也直接使用mq,那么两个不同的定时任务在使用同一个队列,会出现任务混乱.

因为worker执行完任务后会自动去队列中取任务,也就是说,任务A的worker可能会从队列中获取到任务B的任务,任务B的worker也可能会从队列中获取到任务A的任务.

4.当worker获取到的任务不是本项目的任务时,程序就会报错.

二. 解决queue冲突的方法和原理

1.Celery会自动识别任务,自动将定时任务添加到队列.

2.Queue(队列)是RabbitMQ的内部对象,用于存储任务.

3.但celery不是直接将任务放到Queue(队列)中,而是先通过Exchange, Exchange控制任务存放到队列的路由Route,不同的Route指向不同的Queue.

4.使用者可以自定义不同的Queue和Route,并指定Queue和Route的对应关系,用来指定不同定时任务存放到不同队列.

5.在定时任务的配置文件中指定Queue和Route,Exchange就会将定时任务添加到对应的队列,worker也会到这个队列中取任务,避免冲突.

三. 编写代码解决Queue冲突

1.celery定时任务目录结构

代码语言:javascript
复制
# 目录结构
- celery_crontab
    - config.py
    - main.py
    - tasks.py

2.在tasks.py中编写任务函数代码

代码语言:javascript
复制
from config import app


@app.task
def crontab_func1():
    print('在此编写任务要实现的代码')
  
  
@app.task
def crontab_func2():
    print('在此调用实现了定时任务功能的函数或方法')
  

3.在配置文件config.py中自定义Queue,Route,并将Queue和Route配置到Celery对象中

代码语言:javascript
复制
from celery import Celery
from kombu import Exchange, Queue


# celery
app = Celery('demo', broker='amqp://guest@localhost:5672//')
# Queue
queue = (
    # 定义专用的queue,定义Exchange,以及与route对应的key
    Queue('queue_demo', Exchange('exchange_demo', type='direct'),
    routing_key='queue_demo_key'),
)
# Route
route = {
    # 定义任务crontab_func1的queue,routing_key
    'tasks.crontab_func1': {'queue': 'queue_demo', 'routing_key': 'queue_demo_key'},
    'tasks.crontab_func2': {'queue': 'queue_demo', 'routing_key': 'queue_demo_key'},
}


# 指定queue和route的配置应用到celery定时任务的配置中
app.conf.update(CELERY_QUEUES=queue, CELERY_ROUTES=route)

4.在main.py中需要将app.conf.beat_schedule改为app.conf.update,具体如下

代码语言:javascript
复制
from celery.schedules import crontab

from tasks import *


app.conf.update(
    CELERYBEAT_SCHEDULE={
        "crontab_func1": {
            'task': 'tasks.crontab_func1',
            'schedule': crontab(minute='*/1'),
            'args': ()
        },
        "crontab_func2": {
            'task': 'tasks.crontab_func2',
            'schedule': crontab(minute='*/1'),
            'args': ()
        },
    },
)

四. 定时任务的启动 在任务的启动命令中要加上-Q参数,指定任务的队列名,也就是在config.py中自定义的Queue名

代码语言:javascript
复制
# -Q指定当前定时任务的队列,与config.py中定义的queue名保持一致
celery multi start demo_work -A main -Q queue_demo -l info -B --logfile=crontablog.log
# 停止将start改成stop
celery multi stop demo_work -A main -Q queue_demo -l info -B --logfile=crontablog.log
# 重启用restart
celery multi restart demo_work -A main -Q queue_demo -l info -B --logfile=crontablog.log

现在每个定时任务都有指定的队列,所以不管有多少定时任务,都不会出现冲突.

后续如果还有更多的Celery定时任务,均可使用这个方法,定义不重复(通过队列名区分)的Queue和Route,按照上面的步骤实现,避免不同项目之间存取任务的混乱.

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-04-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python 碎片 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档