前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python Celery 库详解

Python Celery 库详解

原创
作者头像
Michel_Rolle
发布2024-02-06 14:45:42
4730
发布2024-02-06 14:45:42
举报
文章被收录于专栏:Python技术专栏Python技术专栏

Celery 是一个基于分布式消息传递的任务队列,用于异步处理任务。它可以与各种消息代理(如RabbitMQ、Redis等)配合使用,支持任务调度、消息传递等功能。本教程将介绍如何使用 Celery 库来创建和管理异步任务。

安装 Celery

首先,我们需要安装 Celery。你可以通过 pip 来安装 Celery:

代码语言:javascript
复制
bashCopy codepip install celery

创建 Celery 应用

在使用 Celery 之前,我们需要创建一个 Celery 应用。创建一个名为 celery_app.py 的文件,并在其中定义 Celery 应用:

代码语言:javascript
复制
pythonCopy codefrom celery import Celery

# 创建 Celery 应用实例
app = Celery('tasks', broker='redis://localhost:6379/0')

# 定义任务
@app.task
def add(x, y):
    return x + y

在这个示例中,我们使用 Redis 作为消息代理,你也可以选择其他的消息代理,如 RabbitMQ。

定义任务

在 Celery 应用中,任务是通过装饰器 @app.task 来定义的。下面是一个简单的任务示例:

代码语言:javascript
复制
pythonCopy code@app.task
def add(x, y):
    return x + y

这个任务接受两个参数 xy,并返回它们的和。

启动 Worker

要执行任务,我们需要启动 Celery worker。在命令行中执行以下命令:

代码语言:javascript
复制
bashCopy codecelery -A celery_app worker --loglevel=info

这将启动一个 Celery worker 来处理任务。

调用任务

在其他地方调用 Celery 任务非常简单。只需导入任务函数并调用它即可:

代码语言:javascript
复制
pythonCopy codefrom celery_app import add

result = add.delay(4, 5)
print(result.get())

在这个示例中,我们调用了之前定义的 add 任务,并传递了参数 4 和 5。delay() 方法用于将任务放入队列中,然后我们使用 get() 方法来获取任务的结果。

异步执行任务

Celery 的主要优势之一是它可以异步执行任务。这意味着任务可以在后台执行,而不会阻塞主程序的执行。下面是一个异步执行任务的示例:

代码语言:javascript
复制
pythonCopy codefrom celery_app import add

result = add.delay(4, 5)

# 执行其他操作

print("其他操作执行中...")

# 等待任务完成并获取结果
print(result.get())

在这个示例中,任务被放入队列后,程序可以继续执行其他操作,而不必等待任务完成。

监控任务状态

有时候,我们需要监控任务的状态,以便知道任务是成功完成、失败还是正在执行中。Celery 提供了状态监控的功能。下面是一个示例:

代码语言:javascript
复制
pythonCopy codefrom celery_app import add

result = add.delay(4, 5)

# 监控任务状态
while not result.ready():
    print("任务正在执行中...")
    # 可选:获取任务状态
    print("任务状态:", result.status)

print("任务完成")
print("任务结果:", result.get())

在这个示例中,我们使用 result.ready() 方法来检查任务是否完成。如果任务完成,我们可以使用 result.get() 方法来获取任务的结果。

错误处理

当任务执行出错时,我们可以捕获异常并处理。下面是一个示例:

代码语言:javascript
复制
pythonCopy codefrom celery.exceptions import SoftTimeLimitExceeded

try:
    result = add.delay(4, "invalid")
    print(result.get())
except SoftTimeLimitExceeded as e:
    print("任务执行超时:", e)
except Exception as e:
    print("任务执行出错:", e)

在这个示例中,我们捕获了 SoftTimeLimitExceeded 异常和其他异常,并打印出错误消息。

结束 Worker

当不再需要 Celery worker 时,我们可以通过发送中断信号来结束它。在命令行中按下 Ctrl + C 即可结束 Celery worker。

监控任务状态

有时候,我们需要监控任务的状态,以便知道任务是成功完成、失败还是正在执行中。Celery 提供了状态监控的功能。下面是一个示例:

代码语言:javascript
复制
pythonCopy codefrom celery_app import add

result = add.delay(4, 5)

# 监控任务状态
while not result.ready():
    print("任务正在执行中...")
    # 可选:获取任务状态
    print("任务状态:", result.status)

print("任务完成")
print("任务结果:", result.get())

在这个示例中,我们使用 result.ready() 方法来检查任务是否完成。如果任务完成,我们可以使用 result.get() 方法来获取任务的结果。

错误处理

当任务执行出错时,我们可以捕获异常并处理。下面是一个示例:

代码语言:javascript
复制
pythonCopy codefrom celery.exceptions import SoftTimeLimitExceeded

try:
    result = add.delay(4, "invalid")
    print(result.get())
except SoftTimeLimitExceeded as e:
    print("任务执行超时:", e)
except Exception as e:
    print("任务执行出错:", e)

在这个示例中,我们捕获了 SoftTimeLimitExceeded 异常和其他异常,并打印出错误消息。

结束 Worker

当不再需要 Celery worker 时,我们可以通过发送中断信号来结束它。在命令行中按下 Ctrl + C 即可结束 Celery worker。

任务结果处理

Celery 支持异步执行任务,并在任务执行完成后返回结果。你可以对任务结果进行处理,比如存储到数据库、发送通知等。下面是一个示例:

代码语言:javascript
复制
pythonCopy codefrom celery_app import add

result = add.delay(4, 5)

# 可选:等待任务完成
result.wait()

# 处理任务结果
if result.successful():
    print("任务成功完成")
    print("任务结果:", result.result)
    # 在这里可以将结果存储到数据库或发送通知等
else:
    print("任务执行失败")
    print("任务异常:", result.result)

在这个示例中,我们使用 result.successful() 方法来检查任务是否成功完成,并根据结果进行相应的处理。

设置任务超时

有时候,任务可能会因为某些原因长时间执行而导致超时。为了避免任务执行时间过长,可以设置任务的超时时间。下面是一个示例:

代码语言:javascript
复制
pythonCopy codefrom celery.exceptions import SoftTimeLimitExceeded

try:
    result = add.apply_async((4, 5), soft_time_limit=10)
    print(result.get())
except SoftTimeLimitExceeded as e:
    print("任务执行超时:", e)
except Exception as e:
    print("任务执行出错:", e)

在这个示例中,我们使用 apply_async() 方法来启动任务,并通过 soft_time_limit 参数设置任务的软超时时间为 10 秒。如果任务在指定的时间内未完成,则会抛出 SoftTimeLimitExceeded 异常。

高级特性

除了上述介绍的基本功能外,Celery 还提供了许多高级特性,如定时任务、任务重试、任务链、分布式任务等。你可以根据实际需求选择使用这些特性。以下是一些高级特性的简单介绍:

  • 定时任务:Celery 支持定时执行任务,可以使用 @app.task 装饰器的 eta 参数或 apply_async() 方法的 eta 参数来设置任务的执行时间。
  • 任务重试:Celery 允许你在任务执行失败时自动重试任务。你可以使用 @app.task 装饰器的 retry 参数来配置任务的重试策略。
  • 任务链:Celery 允许你将多个任务组合成一个任务链,其中一个任务的输出作为下一个任务的输入。你可以使用 chaingroup 组合任务。
  • 分布式任务:Celery 支持分布式任务,可以将任务分发到多台计算机上执行,从而提高任务执行的效率和并发性。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装 Celery
  • 创建 Celery 应用
  • 定义任务
  • 启动 Worker
  • 调用任务
  • 异步执行任务
  • 监控任务状态
  • 错误处理
  • 结束 Worker
  • 监控任务状态
  • 错误处理
  • 结束 Worker
  • 任务结果处理
  • 设置任务超时
  • 高级特性
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档