前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >6. Celery 4.3.0 signatures 任务签名 以及 Primitives任务执行流程 group 、chain

6. Celery 4.3.0 signatures 任务签名 以及 Primitives任务执行流程 group 、chain

作者头像
Devops海洋的渔夫
发布2022-01-17 13:53:50
3950
发布2022-01-17 13:53:50
举报
文章被收录于专栏:Devops专栏Devops专栏

官网文档

https://celery.readthedocs.io/en/latest/userguide/canvas.html

1. signature

到前面的调用任务篇章为止,我们在调用任务的时候只是学习了如何使用delay()和apply_async()方法,当然这两个方法也是非常常用的。但是有时我们并不想简单的将任务发送到队列中,我们想将一个任务函数(由参数和执行选项组成)作为一个参数传递给另外一个函数中,为了实现此目标,Celery使用一种叫做signatures的东西。

**signature()**包含了以下参数:

  • 任务调用的 arguments(参数,即任务本身的参数,像add(x,y)中的参数)
  • keyword arguments(关键字参数,就是debug=false,true这类参数)
  • execution options(执行选项,比如运行时间countdown,到期时间expirt)。

一个signature包装了一个参数和执行选项的单个任务调用。我们可将这个signature传递给函数。

我们先看下tasks.py模块中定义的任务函数:

代码语言:javascript
复制
from celery_tasks.celery import app as celery_app

# 创建任务函数
@celery_app.task
def my_task1():
    print("任务函数(my_task1)正在执行....")

@celery_app.task
def my_task2():
    print("任务函数(my_task2)正在执行....")

@celery_app.task
def my_task3():
    print("任务函数(my_task3)正在执行....")

@celery_app.task
def my_task4(a,b):
    print("任务函数(my_task4)正在执行....")
    return a + b

我们将my_task1() 任务包装signature 执行看看:

代码语言:javascript
复制
# 导入signature
In [24]:  from celery import signature

# 对任务进行签名
In [25]: t1 = signature(my_task1,countdown=1)

# 调用任务
In [27]: t1.delay()
Out[27]: <AsyncResult: dd77773f-e297-47f3-8fe9-42db6fda8da0>

In [28]:

看看celery的worker这块的执行情况,如下:

下面再来对 my_task4() 需要传参数的任务进行signature包装。

代码语言:javascript
复制
In [28]: t4 = signature(my_task4,args=(20, 30),countdown=1)

In [29]: t4.delay()
Out[29]: <AsyncResult: 88958863-24f5-4314-8690-44c0045e7be9>

再来看看celery的worker执行情况,如下:

2. Primitives

这些primitives本身就是signature对象,因此它们可以以多种方式组合成复杂的工作流程。primitives如下:

  • group: 一组任务并行执行,返回一组返回值,并可以按顺序检索返回值。
  • chain: 任务一个一个执行,一个执行完将执行return结果传递给下一个任务函数.

tasks.py模块如下:

代码语言:javascript
复制
from celery_tasks.celery import app as celery_app

# 创建任务函数
@celery_app.task
def my_task1(a, b):
    print("任务函数(my_task1)正在执行....")
    return a + b

@celery_app.task
def my_task2(a, b):
    print("任务函数(my_task2)正在执行....")
    return a + b

@celery_app.task
def my_task3(a, b):
    print("任务函数(my_task3)正在执行....")
    return a + b

group案例如下:

代码语言:javascript
复制
# 导入各个task任务
In [1]: from celery_tasks.tasks import my_task1, my_task2, my_task3

# 导入group
In [2]: from celery import group

# 导入signature
In [3]: from celery import signature

# 创建signature
In [4]: t1 = signature(my_task1,args=(1, 2),countdown=1)

In [5]: t2 = signature(my_task2,args=(3, 4),countdown=1)

In [6]: t3 = signature(my_task3,args=(5, 6),countdown=1)

# 将多个signature放入同一组中
In [7]: my_group = group(t1,t2,t3)

# 执行组任务
In [11]: ret = my_group()

# 输出每个任务结果
In [13]: print(ret.get())
[3, 7, 11]

从celery的worker日志来看,执行group任务的时候,三个task任务是同时进行的。

chain案例如下:

代码语言:javascript
复制
In [1]: from celery_tasks.tasks import my_task1, my_task2, my_task3

In [2]: from celery import signature

In [3]: from celery import chain

# 将多个signature组成一个任务链
# my_task1的运行结果将会传递给my_task2
# my_task2的运行结果会传递给my_task3
In [4]: my_chain = chain(my_task1.s(10,10) | my_task2.s(20) | my_task3.s(30))

# 执行任务链
In [5]: ret = my_chain()

# 输出最终结果
In [6]: print(ret.get())
70

In [7]:

查看worker日志如下:

可以看到,执行的结果是三个任务执行相加的总和。其中my_task1.s(10,10) 也是signature的一种写法。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-10-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 海洋的渔夫 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 官网文档
  • 1. signature
  • 2. Primitives
    • tasks.py模块如下:
      • group案例如下:
        • chain案例如下:
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档