前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python celery(并行编程 31)

python celery(并行编程 31)

作者头像
用户5760343
发布2019-07-30 10:50:13
6950
发布2019-07-30 10:50:13
举报
文章被收录于专栏:sktjsktj

import time from celery import Celery

broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('my_tasks', broker=broker, backend=backend)

@app.task def add(x, y): print('enter task') time.sleep(3) return x + y if name == 'main': print('start task') result = add.delay(3, 18) print('end task') print(result)

broker 指定任务队列的消息中间件,backend 指定了任务执行结果的存储。

@app.task 注册任务 add.delay调用

@app.task(name='task.add', serializer='json')

在对象中添加:

logger = get_task_logger(name)

@task(bind=True) def add(self, x, y): logger.info(self.request.id)

task参数列表:http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-options

任务执行,10秒后 apply_async()

add.apply_async((1, 2), countdown=10)

任务执行

  1. delay, 用来进行最简单便捷的任务执行;
  2. apply_async, 对于任务的执行附加额外的参数,对任务进行控制;
  3. app.send_task, 可以执行未在 Celery 中进行注册的任务

重载task

import celery

class MyTask(celery.Task):

代码语言:javascript
复制
def on_failure(self, exc, task_id, args, kwargs, einfo):
    print('{0!r} failed: {1!r}'.format(task_id, exc))

@task(base=MyTask) def add(x, y): raise KeyError()

  • after_return:在任务执行返回后交给 worker 执行
  • on_failure:在任务执行失败后交给 worker 执行
  • on_retry:在任务进行重试是交给 worker 执行
  • on_success:在任务执行成功后交给 worker 执行

工作流:上一次执行的结果传递到下一次

from celery import chain result = chain(add.s(1, 2), add.s(3), add.s(4)) # 1+2+3+4 result().get() 10

group 任务的并发执行:

from celery import group group(add.s(1, 2), add.s(3,4), add.s(5,6))().get() [3, 7, 11]

chord将执行的结果回调:

@app.task def xsum(values): return sum(values)

from celery import chord chord((add.s(i, i) for i in xrange(10)), xsum.s())().get() # xsum 收到 [2,4,6,...,18] 90

chunks 大任务分小

add.chunks(zip(range(100), range(100)), 10)().get()

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.07.28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • broker 指定任务队列的消息中间件,backend 指定了任务执行结果的存储。
  • @app.task 注册任务 add.delay调用
  • @app.task(name='task.add', serializer='json')
  • 在对象中添加:
  • task参数列表:http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-options
  • 任务执行,10秒后 apply_async()
  • 任务执行
  • 重载task
  • 工作流:上一次执行的结果传递到下一次
  • group 任务的并发执行:
  • chord将执行的结果回调:
  • chunks 大任务分小
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档