前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python测试开发django-158.celery 学习与使用

python测试开发django-158.celery 学习与使用

作者头像
上海-悠悠
发布2021-10-20 11:38:57
4410
发布2021-10-20 11:38:57
举报
文章被收录于专栏:从零开始学自动化测试

前言

Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。它是一个任务队列,专注于实时处理,同时还支持任务调度。 可以使用的场景如:

  • 异步发邮件,这个时候 只需要提交任务给celery 就可以了.之后 由worker 进行发邮件的操作 .
  • 跑批接口的任务,需要耗时比较长,这个时候 也可以做成异步任务 .
  • 定时调度任务等

Celery 简介

Celery 扮演生产者和消费者的角色,先了解一下什么是生产者消费者模式。 该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示:

接下来需要弄清楚几个问题,谁生产数据(Task),谁是中间件(Broker),谁来消费数据(Worker),消费完之后运行结果(backend)在哪?

看下图就很清楚了

celery 的5个角色

  • Task 就是任务,有异步任务(Async Task)和定时任务(Celery Beat)
  • Broker 中间人,接收生产者发来的消息即Task,将任务存入队列。 任务的消费者是Worker。 Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
  • Worker 执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
  • Beat 定时任务调度器,根据配置定时将任务发送给Broker。
  • Backend 用于存储任务的执行结果。

使用环境

Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。那么需要先安装Redis之类的中间件

代码语言:javascript
复制
docker pull redis:latest
docker run -itd --name redis-test -p 6379:6379 redis

pip 安装相关依赖包

代码语言:javascript
复制
pip install celery==3.1.26.post2
pip install redis==2.10.6

Task 任务

先写个最简单的demo,新建一个tasks.py文件,task任务需使用@shared_task装饰器

代码语言:javascript
复制
from celery import Celery
from celery import shared_task

# 实例化,添加broker地址
app = Celery('tasks', broker='redis://ip:6379')

@shared_task
def add(x, y):
    return x + y

打开 tasks.py 所在的目录,启动 worker,-A 参数表示的是 Celery APP 的名,这里指的是 tasks.py。 worker 是一个执行任务角色,后面的 loglevel=info 记录日志类型默认是 info。

代码语言:javascript
复制
celery -A tasks worker --loglevel=info

运行结果

代码语言:javascript
复制
D:\demo\demo\aaa>celery -A tasks worker --loglevel=info
[2021-10-19 09:12:01,168: WARNING/MainProcess] e:\python36\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x2148e024c50
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

[tasks]
  . tasks.add

[2021-10-19 09:12:01,276: INFO/MainProcess] Connected to redis://localhost:6379//
[2021-10-19 09:12:01,365: INFO/MainProcess] mingle: searching for neighbors
[2021-10-19 09:12:02,761: INFO/MainProcess] mingle: all alone
[2021-10-19 09:12:03,313: WARNING/MainProcess] celery@DESKTOP-HJ487C8 ready.

从运行日志看到有tasks任务

代码语言:javascript
复制
[tasks]
  . tasks.add

看到Connected to redis说明已经连接成功了

触发任务(delay)

任务已经创建了,那么什么时候去触发这个任务呢,我们需要在代码里面去触发这个任务,接着上面代码继续写

代码语言:javascript
复制
@shared_task
def add(x, y):
    return x + y

if __name__ == '__main__':
    # 触发任务
    res = add.delay(10, 15)
    print(res)
    print(type(res))  # AsyncResult

运行结果

代码语言:javascript
复制
7492f49b-6735-46fb-a16d-9ec24bd31e56
<class 'celery.result.AsyncResult'>

通过add任务,调用 .delay() 方法来触发一次任务,返回 AsyncResult 类,那么执行的任务结果都在 AsyncResult 类里

运行 的时候查看 worker 运行日志,可以看到已经接收到任务Received task,每个任务会生成一个uuid的task_id,不会重复

代码语言:javascript
复制
[2021-10-19 09:24:14,356: INFO/MainProcess] Received task: tasks.add[885a79ba-c87c-49f7-a23f-0824142c3c98]
[2021-10-19 09:24:14,395: INFO/MainProcess] Task tasks.add[885a79ba-c87c-49f7-a23f-0824142c3c98] succeeded in 0.046999999998661224s: 25

workder 会自动监听到推过来的任务,然后执行,可以看到执行结果’succeeded ‘

backend 任务结果

调用 .delay() 方法触发任务后,返回 AsyncResult 类,可以查看任务的状态,任务id和任务结果

代码语言:javascript
复制
D:\demo\demo\aaa>python
Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.

>>> from tasks import add
>>> res = add.delay(10, 15)
>>> res.task_id
'6a7c8e10-7192-4865-9108-3e98596b9d37'
>>>
>>> res.status
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "E:\python36\lib\site-packages\celery\result.py", line 394, in state
    return self._get_task_meta()['status']
  File "E:\python36\lib\site-packages\celery\result.py", line 339, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "E:\python36\lib\site-packages\celery\backends\base.py", line 307, in get_task_meta
    meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
>>>

任务执行后会生成一个task_id,查看任务运行状态,会发现出现异常AttributeError: ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’ 这是因为任务运行的结果,需要保存到一个地方backend,但是前面实例化的时候只配置一个broker地址,并没有配置backend地址来接收运行结果

代码语言:javascript
复制
from celery import Celery
from celery import shared_task
# backend接收执行结果
app = Celery('tasks',
             broker='redis://ip:6379',
             backend='redis://ip:6379')

@shared_task
def add(x, y):
    return x + y

重新配置后一定要重启worker监听

代码语言:javascript
复制
celery -A tasks worker --loglevel=info

在启动日志[config]里面会看到results这一项已经配置成功

代码语言:javascript
复制
D:\demo\demo\aaa>python
Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add
>>> res = add.delay(10,15)
>>> res.task_id
'5ce249c9-a15b-426a-949b-d1b94bf9f8fa'
>>>
>>> res.status
'SUCCESS'
>>>
>>> res.get()
25

常用的几个属性

  • res.task_id 任务id唯一的,可以根据id拿到结果
  • res.status 任务状态:PENDING、STARTED、RETRY、FAILURE、SUCCESS
  • res.get() 任务运行结果,必须要任务状态是’SUCCESS’,才会有运行结果

AsyncResult 获取结果

当触发一个任务后,会得到一个task_id,但是我们不会一直去查询status状态去获取结果,可能会过一段时间再去看看运行结果。 那么在已经知道task_id 的情况下,如何去查询状态和结果?可以用到AsyncResult 类

代码语言:javascript
复制
from celery.result import AsyncResult
res = AsyncResult(id='5ce249c9-a15b-426a-949b-d1b94bf9f8fa')
print(res.state)   # 'SUCCESS'
print(res.get())  # 25

结合django使用,参考前面这篇https://www.cnblogs.com/yoyoketang/p/15422804.html 更多参考教程https://blog.csdn.net/u010339879/article/details/97691231 更多参考教程https://www.pianshen.com/article/2176289575/

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-10-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 从零开始学自动化测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Celery 简介
  • 使用环境
  • Task 任务
  • 触发任务(delay)
  • backend 任务结果
  • AsyncResult 获取结果
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档