前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文搞定 celery 任务远程调用

一文搞定 celery 任务远程调用

作者头像
somenzz
发布2020-12-10 11:22:53
2K0
发布2020-12-10 11:22:53
举报
文章被收录于专栏:Python七号Python七号

celery 是分布式的异步任务队列,既然是分布式,那么肯定是支持远程调度任务的,那么它是如何实现的呢?

celery 主要是通过中间人来实现远程调度的,中间人 broker 的工具如 RabbitMQ,Redis 服务支持远程访问。

由于官方的示例都是基于本地的任务调用,本文向大家展示如何使用 Celery 调用远程主机上的任务- 在主机 C 上调用主机 A 上的任务 taskA,调用主机 B 上的任务 taskB。

代码语言:javascript
复制
主机C ip地址:192.168.0.107  
主机A ip地址:192.168.0.111  
主机B ip地址:192.168.0.112

基于上一篇文章 分布式异步任务队列神器-Celery 中 myCeleryProj 项目的源码。

开始动手:

第一步:定义任务队列。

修改 settings.py 使任务 taskA 运行在队列 tasks_A 上,任务 taskB 运行在队列 tasks_B 上,中间人均指向主机 C 上的 redis 数据库:redis://192.168.0.107:6379/0,redis 数据库不是必须在主机C上启用,redis 数据库可运行在任意一台主机上,只要确保其允许远程访问即可。完整的settings.py如下所示

代码语言:javascript
复制
from kombu import Queue

CELERY_TIMEZONE='Asia/Shanghai'

CELERY_QUEUES = (  # 定义任务队列
    Queue("default", routing_key="task.#"),  # 路由键以“task.”开头的消息都进default队列
    Queue("tasks_A", routing_key="A.#"),  # 路由键以“A.”开头的消息都进tasks_A队列
    Queue("tasks_B", routing_key="B.#"),  # 路由键以“B.”开头的消息都进tasks_B队列
)

CELERY_ROUTES = (
   [
       ("myCeleryProj.tasks.add", {"queue": "default"}), # 将add任务分配至队列 default
       ("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),# 将taskA任务分配至队列 tasks_A
       ("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),# 将taskB任务分配至队列 tasks_B
   ],
)

BROKER_URL = "redis://192.168.137.129:6379/0"  # 使用redis 作为消息代理

CELERY_RESULT_BACKEND = "redis://192.168.137.129:6379/0"  # 任务结果存在Redis

CELERY_RESULT_SERIALIZER = "json"  # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显

第二步:确认中间人已启动,并部署源码。

1 确保主机C上的redis数据库服务已经启动,如有以下信息说明已经成功启动。

代码语言:javascript
复制
$ps -ef|grep redis
aaron 2229 2187 0 21:43 pts/1 00:00:00 ./redis-server 0.0.0.0:6379
aaron 2452 2437 0 21:47 pts/2 00:00:00 grep --color=auto redis

2 将 myCeleryProj 目录分别复制到三台主机中。

代码语言:javascript
复制
scp -r myCeleryProj aaron@192.168.0.111:~
scp -r myCeleryProj aaron@192.168.0.112:~
scp -r myCeleryProj aaron@192.168.0.107:~

第三步:启动主机 A、B 上的 worker。

在主机A 上启用worker,监控队列tasks_A(前提是已经完安装python库celery和redis),命令如下所示:

代码语言:javascript
复制
celery -A myCeleryProj.app worker -Q tasks_A -l info

在主机B上执行同样的操作:

代码语言:javascript
复制
celery -A myCeleryProj.app worker -Q tasks_B -l info

第四步:调用程序。

编写 start_tasks.py,如下所示:

代码语言:javascript
复制
from myCeleryProj.tasks import taskA,taskB
import time
#异步执行 方法一
#resultA = taskA.delay()
#resultB = taskB.delay()

#异步执行 方法二
resultA = taskA.apply_async()
resultB = taskB.apply_async(args=[])
resultC = taskA.apply_async(queue='tasks_B')

while not (resultA.ready() and resultB.ready()):# 循环检查任务是否执行完毕
    time.sleep(1)

print(resultA.successful()) #判断任务是否成功执行
print(resultB.successful()) #判断任务是否成功执行

其中上述代码第 9 行可以通过指定 queue=’tasks_B’ 的方式来在调用任务时改变taskA执行的队列,这在实用中是非常方便的。 执行 python start_tasks.py 得到如下结果

代码语言:javascript
复制
aaron@ubuntu:~$ python start_tasks.py
True
True

主机A,主机B上worker运行情况如下所示:

主机 A

主机 B

可以看出在主机 B 上,我们调用时指定队列的 taskA 也在队列 tasks_B 执行。 (完)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-09-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python七号 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 第一步:定义任务队列。
  • 第二步:确认中间人已启动,并部署源码。
  • 第三步:启动主机 A、B 上的 worker。
  • 第四步:调用程序。
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档