前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Celery入门与实战

Celery入门与实战

作者头像
五分钟学SRE
发布2023-12-05 18:16:27
4700
发布2023-12-05 18:16:27
举报
文章被收录于专栏:五分钟学SRE

在开发过程中,处理异步任务是一项重要而常见的任务。为了更好地管理和处理这些任务,目前比较强大与实用的有 Celery。Celery 是一个基于 Python 的分布式任务队列,旨在帮助开发者处理异步任务,从而提高应用程序的可伸缩性和性能。

Celery 简介

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It’s a task queue with focus on real-time processing, while also supporting task scheduling.

Celery 是一个开源 Python 库,用于异步运行任务。它是一个任务队列,保存任务并以适当的方式将它们分发给工作人员。它主要侧重于实时操作,但也支持调度(运行定期间隔任务)。为我们提供了高效的异步任务处理解决方案。Celery 引入了各种消息代理,例如RabbitMQRedis。

Celery 结合了各种 Web 框架,包括 Flask、Pylons、web2py、Tryton 和 Tornado。

Celery的优点

  1. 异步任务处理:Celery允许将耗时的任务异步执行,避免阻塞主应用程序。这对于需要长时间处理的任务,如发送电子邮件、处理大量数据,特别有用。
  2. 分布式计算:Celery支持将任务分发到多台计算机或节点上,从而实现分布式计算。这使得可以轻松地将任务分散到多个服务器上,以提高任务处理能力。
  3. 定时任务调度:Celery支持定时任务的调度,可以在预定的时间点或周期性地执行任务。这对于自动化重复性任务非常有用,如定时数据备份或数据清理。
  4. 可扩展性:Celery的架构支持水平扩展,可以根据需要增加更多的任务队列和工作进程,以适应不断增长的任务负载。
  5. 容错性:Celery提供了一些机制来处理失败的任务,例如重试机制和错误处理。它还支持将任务结果存储在持久化存储中,以防止任务结果丢失。

Celery的架构

Celery的架构由多个组件组成,包括任务发布者、任务队列和工作进程。以下是它们的主要角色:

  1. 任务发布者(Producer):任务发布者负责将需要执行的任务发布到任务队列中。这可以是Web应用、命令行工具或其他应用程序。
  2. 任务队列(Broker):任务队列是用于存储和传递待执行任务的中间件。Celery支持多种消息中间件,如RabbitMQ、Redis、Amazon SQS等。
  3. 工作进程(Worker):工作进程从任务队列中获取任务,执行任务,并将执行结果返回。您可以配置多个工作进程来处理任务,从而实现并行处理和高吞吐量。

消息代理

Celery 支持多种消息代理,其中两个常用的选择是 RabbitMQ 和 Redis。选择合适的消息代理取决于你的项目需求。

RabbitMQ

RabbitMQ 是一个高度可靠的消息代理,适用于大规模和复杂的应用程序。你可以使用 RabbitMQ 来实现任务的分发和处理,同时它支持高级的消息队列特性,如消息确认和持久化。

代码语言:javascript
复制
# 使用 RabbitMQ 作为消息代理
app = Celery('myapp', broker='pyamqp://guest@localhost//')

Redis

Redis 是一个快速的消息代理,适用于小型和中小型应用程序。它的速度和简单性使其成为一个不错的选择。你可以使用 Redis 来加速任务的分发和处理。

代码语言:javascript
复制
# 使用 Redis 作为消息代理
app = Celery('myapp', broker='redis://localhost:6379/0')

celery的安装与使用

创建python虚拟环境

代码语言:javascript
复制
python3.9 -m venv py39

进入虚拟环境

代码语言:javascript
复制
source py39/bin/activate 
pip install celery==5.1.2 pip install redis==3.5.3

异步任务示例

写一个task.py 文件,用于异步任务调度的demo

代码语言:javascript
复制
from celery import Celery
#定义消息代理(broker)的地址
BROKER_URL = 'redis://localhost:6379/0'
#定义结果后端(backend)的地址
BACKEND_URL = 'redis://localhost:6379/1'
#创建一个 Celery 应用实例
app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL, )

#使用装饰器定义一个 Celery 异步任务,任务名为 'celery demo run'
@app.task(name='celery demo run')
def add(x, y):
    return x + y

代码语言:javascript
复制
写一个run_task.py 文件,用于调度异步任务入口
代码语言:javascript
复制
from tasks import add

if __name__ == '__main__':
    print('task start....')
    result = add.delay(2,3)
    print('task end....')
    print(result)

在不启动 Celery 工作者(worker)的时候直接执行run_task.py,可以看到直接返回了celery异步任务的task id。

代码语言:javascript
复制
python run_task.py
task start....
task end....
cf191a9d-ef91-46ee-b0c4-5153e853079d

我们启动 Celery 工作者(worker),可以看到下面的输出,celery 从redis拿到任务,并且执行输出

启动 Celery 工作者参数:

代码语言:javascript
复制

-A, --app: 指定 Celery 应用模块的名称。这是必要参数,用于加载应用程序的配置。例如:-A tasks 表示加载名为 tasks 的 Celery 应用。
--loglevel: 指定日志级别,控制日志的输出详细程度。常用的级别包括 info、warning、error 等。
--concurrency: 设置工作者的并发数,即同时处理任务的数量。默认值是 CPU 核心数的 2 倍。
--queues: 指定工作者处理的队列。可以使用逗号分隔的队列名列表,例如 queue1,queue2。
--hostname: 设置工作者的主机名,用于识别不同的工作者实例。
--prefetch-multiplier: 设置工作者从队列中预取的任务数量。默认值为 4。
--max-tasks-per-child: 设置工作者在重新启动之前可以处理的最大任务数。用于防止内存泄漏。
--time-limit: 限制单个任务的最大执行时间(秒)。
代码语言:javascript
复制
celery  -A tasks worker -l INFO

 
 -------------- celery@C02G20FCMD6M v5.1.2 (sun-harmonics)
--- ***** ----- 
-- ******* ---- macOS-11.7.4-x86_64-i386-64bit 2023-08-30 02:35:35
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x10edd0550
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/1
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . celery demo run

[2023-08-30 02:35:35,451: INFO/MainProcess] Connected to redis://localhost:6379/0
[2023-08-30 02:35:35,455: INFO/MainProcess] mingle: searching for neighbors
[2023-08-30 02:35:36,469: INFO/MainProcess] mingle: all alone
[2023-08-30 02:35:36,484: INFO/MainProcess] celery@C02G20FCMD6M ready.
[2023-08-30 02:35:36,488: INFO/MainProcess] Task celery demo run[cf191a9d-ef91-46ee-b0c4-5153e853079d] received
[2023-08-30 02:35:36,510: INFO/ForkPoolWorker-1] Task celery demo run[39c2e810-4337-4133-b5ff-ea64dfc0f49f] succeeded in 0.016289540000000047s: 5
代码语言:javascript
复制
    再执行一次run_task 脚本,可以看到work 正常调度了
代码语言:javascript
复制
python run_task.py
task start....
task end....
f6137e38-35a2-4df7-be8a-cccd4035c293

[2023-08-30 02:37:33,978: INFO/MainProcess] Task celery demo run[f6137e38-35a2-4df7-be8a-cccd4035c293] received
[2023-08-30 02:37:33,980: INFO/ForkPoolWorker-8] Task celery demo run[f6137e38-35a2-4df7-be8a-cccd4035c293] succeeded in 0.00101882100000239s: 5

定时任务示例

在Celery中,定时任务通常被称为"periodic tasks",它允许你在指定的时间间隔内自动执行任务。

代码语言:javascript
复制
from celery import Celery
#定义消息代理(broker)的地址
BROKER_URL = 'redis://localhost:6379/0'
#定义结果后端(backend)的地址
BACKEND_URL = 'redis://localhost:6379/1'
#创建一个 Celery 应用实例
app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL, )
# 在 Celery 应用配置中设置消息序列化方式
app.conf.update(
    task_serializer='json',  # 任务消息序列化方式
    result_serializer='json',  # 任务结果序列化方式
)
# 在 Celery 应用配置中设置并发参数
app.conf.update(
    worker_concurrency=4,  # 同时执行的工作进程数量
    task_max_retries=3,  # 单个任务的最大执行次数(重试次数)
)


app.conf.beat_schedule = {
    'my-periodic-task': {
        'task': 'tasks.my_periodic_task',
        'schedule': 10.0,  # 每10秒执行一次
    },
}

@app.task
def my_periodic_task():
    print("test1111111")

启动beat 进程

代码语言:javascript
复制
celery -A tasks beat --loglevel=info

可以看到beat调度的任务日志

代码语言:javascript
复制
celery beat v5.1.2 (sun-harmonics) is starting.
__    -    ... __   -        _
LocalTime -> 2023-09-22 10:58:09
Configuration ->
    . broker -> redis://localhost:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2023-09-22 10:58:09,863: INFO/MainProcess] beat: Starting...
[2023-09-22 10:58:09,883: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (tasks.my_periodic_task)
[2023-09-22 10:58:19,882: INFO/MainProcess] Scheduler: Sending due task my-periodic-task (tasks.my_periodic_task)
代码语言:javascript
复制
可以在worker 进程看到对应的打印的日志
代码语言:javascript
复制
[2023-09-22 11:00:19,884: INFO/MainProcess] Task tasks.my_periodic_task[9c8e8734-b738-4fb4-9dfb-67a97031b007] received
[2023-09-22 11:00:19,886: WARNING/ForkPoolWorker-2] into beat demo: 
[2023-09-22 11:00:19,886: WARNING/ForkPoolWorker-2] 

[2023-09-22 11:00:19,888: WARNING/ForkPoolWorker-2] test1111111
[2023-09-22 11:00:19,889: WARNING/ForkPoolWorker-2] 

[2023-09-22 11:00:19,893: INFO/ForkPoolWorker-2] Task tasks.my_periodic_task[9c8e8734-b738-4fb4-9dfb-67a97031b007] succeeded in 0.006949007999992318s: None

这篇文章我们先介绍下celery的基础入门,在后续的文章我们将继续学习下celery与django的结合使用与具体的案例。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-09-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 五分钟学SRE 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Celery的优点
  • Celery的架构
  • 消息代理
    • RabbitMQ
      • Redis
      • celery的安装与使用
        • 异步任务示例
          • 定时任务示例
          相关产品与服务
          云数据库 Redis
          腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档