前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式任务管理系统 Celery 之三

分布式任务管理系统 Celery 之三

作者头像
用户1278550
发布2018-08-09 14:01:12
4730
发布2018-08-09 14:01:12
举报
文章被收录于专栏:idbaidba

一 前言

前面一篇文章 分布式任务管理系统 Celery 之二 以工程实践为例进行深入学习Celery,介绍工程中Celery的配置结构,调用方法,定时任务相关知识,本文继续介绍celery的任务的高级特性-工作流(canvas)

二 工作流

工作流基于子任务(subtask)实现. 子任务也可以视为一种任务,但如果把任务视为函数的话,它可能是填了部分参数的函数。子任务的主要价值在于它可以用于关联运算中,即几个子任务按某种工作流方式的定义执行更为复杂的任务。

2.1 子任务

In [32]: from celery_app.task1 import add In [33]: task = add.subtask((6,6),countdown=3) In [34]: task.args Out[34]: (6, 6) In [35]: task.name Out[35]: u'celery_app.task1.add' In [36]: task.apply_async() Out[36]: <AsyncResult: 7e530aa8-381b-42fe-9a5f-e9c5aad73cc1>

add.subtask 也可以使用 add.s() 代替。Celery的工作流主要包含以下几种

2.2 chain 串行的执行任务,将前面task的执行结果作为参数传递给后面,直到全部执行完成

In [8]: from celery_app.task1 import add

In [9]: from celery_app.task2 import mul

In [10]: from celery import chain

In [11]: res = chain(add.s(2, 2), add.s(4), mul.s(3))()

In [12]: res.get()

Out[12]: 24

日志显示 执行两次add ,一次mul 任务 2+2=4,作为第二次 add 的参数 4+4 得到8 ,在作为mul的参数 3*8=24

2.3 group 并行的执行一系列任务

In [8]: from celery import group

In [10]: res = group(add.s(i,i) for i in range(3))()

In [11]: res.get()

Out[11]: [0, 2, 4]

日志输出

从日志中可以看出来任务执行的顺序是并行的。

2.4 chord是包含回调的group操作

In [18]: from celery import chord In [19]: res = chord((add.s(i,i) for i in range(5)), add.s(['end']))() In [20]: res.get() Out[20]: [0, 2, 4, 6, 8, u'end']

# 执行完前面的for 循环之后,在结果的list 中添加了一个 a

日志显示

2.5 starmap/map

可以将每个参数都作为任务的参数执行一遍,map接收一个参数,starmap可以接收两个参数。本例add需要2个参数,故使用starmap。

In [15]: ret = add.starmap(zip(range(4),range(4)))

In [17]: print ret

[celery_app.task1.add(*x) for x in [(0, 0), (1, 1), (2, 2), (3, 3)]]

日志显示

执行四次add 任务,并将结果汇合为一个列表。

add.starmap 返回的并不是一个结果集,而是一个task实例。

In [18]: add.starmap(zip(range(4),range(4)))

Out[18]: [celery_app.task1.add(*x) for x in [(0, 0), (1, 1), (2, 2), (3, 3)]]

In [19]: ~add.starmap(zip(range(4),range(4)))

Out[19]: [0, 2, 4, 6]

In [21]: [add(*x) for x in [(0, 0), (1, 1), (2, 2), (3, 3)]]

Out[21]: [0, 2, 4, 6]

2.6 chunks 可以将任务分块,并发执行task并将任务的结果汇总为列表。

In [26]: res = add.chunks(zip(range(10),range(10)),5)()

In [27]: res.get()

Out[27]: [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]

日志显示

三 小结

本文浅显的介绍了Celery task的高阶特性--任务流以及子任务,能解决一下简单的具有依赖关系的任务流程需求。更多的知识点需要有兴趣的朋友详细查看官方文档自己探索,毕竟 纸上来得终觉浅,绝知此事要躬行。

最后 祝大家圣诞节快乐。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-12-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档