首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python—Celery异步分布式

python—Celery异步分布式

作者头像
py3study
发布2020-01-06 13:25:59
5230
发布2020-01-06 13:25:59
举报
文章被收录于专栏:python3python3

一、Celery异步分布式

Celery  是一个python开发的异步分布式任务调度模块,是一个消息传输的中间件,可以理解为一个邮箱,每当应用程序调用celery的异步任务时,会向broker传递消息,然后celery的worker从中取消息

Celery  用于存储消息以及celery执行的一些消息和结果

对于brokers,官方推荐是rabbitmq和redis

对于backend,也就是指数据库,为了简单一般使用redis

clipboard.png
clipboard.png

使用redis连接url格式:

redis://:password@hostname:port/db_number

1)定义连接脚本tasks.py

#!/usr/bin/env python
from celery import Celery
broker = "redis://192.168.2.230:6379/1"
backend = "redis://192.168.2.230:6379/2"
app = Celery("tasks", broker=broker, backend=backend)

@app.task
def add(x,y):
    return x+y

2)安装启动celery

pip install celery

pip install redis

启动方式:celery -A huang tasks -l info  #-l 等同于 --loglevel

1.png
1.png

3)执行测试 huang.py 

#!/usr/bin/env python
from tasks import add

re = add.delay(10,20)

print(re.result)   #任务返回值
print(re.ready)     #如果任务被执行返回True,其他情况返回False

print(re.get(timeout=1))  #带参数的等待,最后返回结果
print(re.status)  #任务当前状态

运行结果:

30

<bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>

30

SUCCESS

4)根据成功返回的key或celery界面输出的信息,查看redis存储

blob.png
blob.png

说明:停止celery服务,执行完huang.py之后,再启动celery服务也是有保存数据的

二、celery多进程

1.png
1.png

1)配置文件 celeryconfig.py

#!/usr/bin/env python
#-*- coding:utf-8 -*-

from kombu import Exchange,Queue

BROKER_URL = "redis://192.168.2.230:6379/3"
CELERY_RESULT_BACKEND = "redis://192.168.2.230:6379/4"

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)

CELERY_ROUTES = {
'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},
'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}
}

2)tasks.py

#!/usr/bin/env python
#-*- coding:utf-8 -*-

from celery import Celery

app = Celery()
app.config_from_object("celeryconfig")

@app.task
    def taskA(x,y):
    return x+y
    
@app.task
    def taskB(x,y,z):
    return x+y+z

3)启动celery

celery -A tasks worker --loglevel info

4)执行脚本huang2.py

#!/usr/bin/env python
#-*- coding:utf-8 -*-

from tasks import taskA,taskB

re = taskA.delay(10,20)

print(re.result)   #任务返回值
print(re.ready)     #如果任务被执行返回True,其他情况返回False
print(re.get(timeout=1))  #带参数的等待,最后返回结果
print(re.status)  #任务当前状态

re2 = taskB.delay(10,20,30)
print(re2.result)
print(re2.ready)
print(re2.get(timeout=1))
print(re2.status)

5)运行结果

None

<bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>

30

SUCCESS

None

<bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>

60

SUCCESS

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档