首页
学习
活动
专区
圈层
工具
发布

python celery(并行编程 31)

import time from celery import Celery

broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('my_tasks', broker=broker, backend=backend)

@app.task def add(x, y): print('enter task') time.sleep(3) return x + y if name == 'main': print('start task') result = add.delay(3, 18) print('end task') print(result)

broker 指定任务队列的消息中间件,backend 指定了任务执行结果的存储。

@app.task 注册任务 add.delay调用

@app.task(name='task.add', serializer='json')

在对象中添加:

logger = get_task_logger(name)

@task(bind=True) def add(self, x, y): logger.info(self.request.id)

task参数列表:http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-options

任务执行,10秒后 apply_async()

add.apply_async((1, 2), countdown=10)

任务执行

  1. delay, 用来进行最简单便捷的任务执行;
  2. apply_async, 对于任务的执行附加额外的参数,对任务进行控制;
  3. app.send_task, 可以执行未在 Celery 中进行注册的任务

重载task

import celery

class MyTask(celery.Task):

代码语言:javascript
复制
def on_failure(self, exc, task_id, args, kwargs, einfo):
    print('{0!r} failed: {1!r}'.format(task_id, exc))

@task(base=MyTask) def add(x, y): raise KeyError()

  • after_return:在任务执行返回后交给 worker 执行
  • on_failure:在任务执行失败后交给 worker 执行
  • on_retry:在任务进行重试是交给 worker 执行
  • on_success:在任务执行成功后交给 worker 执行

工作流:上一次执行的结果传递到下一次

from celery import chain result = chain(add.s(1, 2), add.s(3), add.s(4)) # 1+2+3+4 result().get() 10

group 任务的并发执行:

from celery import group group(add.s(1, 2), add.s(3,4), add.s(5,6))().get() [3, 7, 11]

chord将执行的结果回调:

@app.task def xsum(values): return sum(values)

from celery import chord chord((add.s(i, i) for i in xrange(10)), xsum.s())().get() # xsum 收到 [2,4,6,...,18] 90

chunks 大任务分小

add.chunks(zip(range(100), range(100)), 10)().get()

下一篇
举报
领券