专栏首页idba分布式任务管理系统 Celery 之三

分布式任务管理系统 Celery 之三

一 前言

前面一篇文章 分布式任务管理系统 Celery 之二 以工程实践为例进行深入学习Celery,介绍工程中Celery的配置结构,调用方法,定时任务相关知识,本文继续介绍celery的任务的高级特性-工作流(canvas)

二 工作流

工作流基于子任务(subtask)实现. 子任务也可以视为一种任务,但如果把任务视为函数的话,它可能是填了部分参数的函数。子任务的主要价值在于它可以用于关联运算中,即几个子任务按某种工作流方式的定义执行更为复杂的任务。

2.1 子任务

In [32]: from celery_app.task1 import add In [33]: task = add.subtask((6,6),countdown=3) In [34]: task.args Out[34]: (6, 6) In [35]: task.name Out[35]: u'celery_app.task1.add' In [36]: task.apply_async() Out[36]: <AsyncResult: 7e530aa8-381b-42fe-9a5f-e9c5aad73cc1>

add.subtask 也可以使用 add.s() 代替。Celery的工作流主要包含以下几种

2.2 chain 串行的执行任务,将前面task的执行结果作为参数传递给后面,直到全部执行完成

In [8]: from celery_app.task1 import add

In [9]: from celery_app.task2 import mul

In [10]: from celery import chain

In [11]: res = chain(add.s(2, 2), add.s(4), mul.s(3))()

In [12]: res.get()

Out[12]: 24

日志显示 执行两次add ,一次mul 任务 2+2=4,作为第二次 add 的参数 4+4 得到8 ,在作为mul的参数 3*8=24

2.3 group 并行的执行一系列任务

In [8]: from celery import group

In [10]: res = group(add.s(i,i) for i in range(3))()

In [11]: res.get()

Out[11]: [0, 2, 4]

日志输出

从日志中可以看出来任务执行的顺序是并行的。

2.4 chord是包含回调的group操作

In [18]: from celery import chord In [19]: res = chord((add.s(i,i) for i in range(5)), add.s(['end']))() In [20]: res.get() Out[20]: [0, 2, 4, 6, 8, u'end']

# 执行完前面的for 循环之后,在结果的list 中添加了一个 a

日志显示

2.5 starmap/map

可以将每个参数都作为任务的参数执行一遍,map接收一个参数,starmap可以接收两个参数。本例add需要2个参数,故使用starmap。

In [15]: ret = add.starmap(zip(range(4),range(4)))

In [17]: print ret

[celery_app.task1.add(*x) for x in [(0, 0), (1, 1), (2, 2), (3, 3)]]

日志显示

执行四次add 任务,并将结果汇合为一个列表。

add.starmap 返回的并不是一个结果集,而是一个task实例。

In [18]: add.starmap(zip(range(4),range(4)))

Out[18]: [celery_app.task1.add(*x) for x in [(0, 0), (1, 1), (2, 2), (3, 3)]]

In [19]: ~add.starmap(zip(range(4),range(4)))

Out[19]: [0, 2, 4, 6]

In [21]: [add(*x) for x in [(0, 0), (1, 1), (2, 2), (3, 3)]]

Out[21]: [0, 2, 4, 6]

2.6 chunks 可以将任务分块,并发执行task并将任务的结果汇总为列表。

In [26]: res = add.chunks(zip(range(10),range(10)),5)()

In [27]: res.get()

Out[27]: [[0, 2], [4, 6], [8, 10], [12, 14], [16, 18]]

日志显示

三 小结

本文浅显的介绍了Celery task的高阶特性--任务流以及子任务,能解决一下简单的具有依赖关系的任务流程需求。更多的知识点需要有兴趣的朋友详细查看官方文档自己探索,毕竟 纸上来得终觉浅,绝知此事要躬行。

最后 祝大家圣诞节快乐。

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-12-24

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 分布式任务管理系统 Celery 之一

    开发自动化管理平台的过程中,有执行时间较长的任务比如安装基础软件,备份恢复;有定时执行的任务比如定期收集元数据,检查慢日志数量等等,我们可以自己开发一...

    用户1278550
  • 分布式任务管理系统 Celery 之二

    前面一篇文章分布式任务管理系统 Celery 之一介绍了分布式任务调度队列Celery的框架以及原理,使用的例子比较简单,对实际的使用场景没有意义。本系列文章...

    用户1278550
  • Redis 6.0 新特性概览

    Redis 6 RC2 于今年3月5号Release,预计今年4.30月份发布GA版本,官方网站提供 unstable 版本的供大家测试,本文基于官方文档介绍R...

    用户1278550
  • 小工具:同步系统时间

    超级大猪
  • Python的functools模块

      元组WRAPPER_ASSIGNMENTS中是要被覆盖的属性:模块名、名称、限定名、文档、参数注解

    py3study
  • 使用命令行检测Ubuntu版本方法

    lsb_release实用程序可以显示有关Linux发行版的LSB(Linux标准库)信息。它是检查Ubuntu版本的首选方法,无论运行的是哪个桌面环境或Ubu...

    砸漏
  • cmake:用add_subdirectory()添加外部项目文件夹

    版权声明:本文为博主原创文章,转载请注明源地址。 https://blog.csdn.net/10...

    用户1148648
  • 事务处理的数据存储

    在上篇文章我们讨论了数据模型,今天试着讨论更基础的数据存储和搜索。数据存储根据开发者使用,可以分为一般的事务处理和数据分析,因为这两者面临的情况不一样。事务处理...

    哒呵呵
  • 300亿条出租车数据里的五大秘密:上海8点13分最堵,司机凌晨喜欢把车停靠在…

    上海强生出租车公司的出租车每隔10秒钟会自动向总部的服务器发送一条数据,记录自己所在的经纬度、车速、车内是否有人、行驶方向等信息。2015年上海政府公开了4月一...

    华章科技
  • [javaEE] 反射-通过反射了解集合泛型本质

    陶士涵

扫码关注云+社区

领取腾讯云代金券