一 前言
前面一篇文章 分布式任务管理系统 Celery 之二 以工程实践为例进行深入学习Celery,介绍工程中Celery的配置结构,调用方法,定时任务相关知识,本文继续介绍celery的任务的高级特性-工作流(canvas)
二 工作流
工作流基于子任务(subtask)实现. 子任务也可以视为一种任务,但如果把任务视为函数的话,它可能是填了部分参数的函数。子任务的主要价值在于它可以用于关联运算中,即几个子任务按某种工作流方式的定义执行更为复杂的任务。
2.1 子任务
In [32]: from celery_app.task1 import add In [33]: task = add.subtask((6,6),countdown=3) In [34]: task.args Out[34]: (6, 6) In [35]: task.name Out[35]: u'celery_app.task1.add' In [36]: task.apply_async() Out[36]: <AsyncResult: 7e530aa8-381b-42fe-9a5f-e9c5aad73cc1>
add.subtask 也可以使用 add.s() 代替。Celery的工作流主要包含以下几种
2.2 chain 串行的执行任务,将前面task的执行结果作为参数传递给后面,直到全部执行完成
In [8]: from celery_app.task1 import add
In [9]: from celery_app.task2 import mul
In [10]: from celery import chain
In [11]: res = chain(add.s(2, 2), add.s(4), mul.s(3))()
In [12]: res.get()
Out[12]: 24
日志显示 执行两次add ,一次mul 任务 2+2=4,作为第二次 add 的参数 4+4 得到8 ,在作为mul的参数 3*8=24
2.3 group 并行的执行一系列任务
In [8]: from celery import group
In [10]: res = group(add.s(i,i) for i in range(3))()
In [11]: res.get()
Out[11]: [0, 2, 4]
日志输出
从日志中可以看出来任务执行的顺序是并行的。
2.4 chord是包含回调的group操作
In [18]: from celery import chord In [19]: res = chord((add.s(i,i) for i in range(5)), add.s(['end']))() In [20]: res.get() Out[20]: [0, 2, 4, 6, 8, u'end']
# 执行完前面的for 循环之后,在结果的list 中添加了一个 a
日志显示
2.5 starmap/map
可以将每个参数都作为任务的参数执行一遍,map接收一个参数,starmap可以接收两个参数。本例add需要2个参数,故使用starmap。
In [15]: ret = add.starmap(zip(range(4),range(4)))
In [17]: print ret
[celery_app.task1.add(*x) for x in [(0, 0), (1, 1), (2, 2), (3, 3)]]
日志显示
执行四次add 任务,并将结果汇合为一个列表。
add.starmap 返回的并不是一个结果集,而是一个task实例。
In [18]: add.starmap(zip(range(4),range(4)))
Out[18]: [celery_app.task1.add(*x) for x in [(0, 0), (1, 1), (2, 2), (3, 3)]]
In [19]: ~add.starmap(zip(range(4),range(4)))
Out[19]: [0, 2, 4, 6]
In [21]: [add(*x) for x in [(0, 0), (1, 1), (2, 2), (3, 3)]]
Out[21]: [0, 2, 4, 6]
2.6 chunks 可以将任务分块,并发执行task并将任务的结果汇总为列表。
In [26]: res = add.chunks(zip(range(10),range(10)),5)()
In [27]: res.get()
Out[27]: [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]
日志显示
三 小结
本文浅显的介绍了Celery task的高阶特性--任务流以及子任务,能解决一下简单的具有依赖关系的任务流程需求。更多的知识点需要有兴趣的朋友详细查看官方文档自己探索,毕竟 纸上来得终觉浅,绝知此事要躬行。
最后 祝大家圣诞节快乐。