前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Celery 用来处理工作流和多个队列

Celery 用来处理工作流和多个队列

作者头像
用户4235284
发布2023-10-14 16:14:25
3100
发布2023-10-14 16:14:25
举报
文章被收录于专栏:后端学习之道

Celery 是一个与django很好地集成的异步任务队列。在这篇文章中,我不会写一篇关于如何设置和使用 celery 的教程,已经有很多文章了。我将讨论我在我从事的一些项目中使用的 celery 的一些高级功能。

任务的分组和链接

考虑一个场景,你正在做一个电子商务项目,你想编写一个任务来更新产品详细信息,并且只在所有更新时调用 API 来更新状态。一种方法是在没有 celery 的情况下编写 cron 作业,但这将是同步的。每个产品都会阻塞线程,直到它完成。但是,对于 celery group primitives,它将是异步的,即将为每个产品创建一个新任务,并且它们异步运行而不会相互阻塞。

以下是我们更新产品详细信息的功能

代码语言:javascript
复制
@app.task(name="update_product_details" )
def update_product_details(product_id):
    try:
        product_info = make_http_call(product_id)
        obj = Product.objects.get(id=product_id)
        for key, val in product_info.items():
            setattr(obj, key, val)
            obj.save()
    except Exception:
        return {"status": False, "message" : "error in updating"}
    return {"status": True, "message": "successfully updated"}

下面是每天运行的用于更新产品详细信息的 cron。

代码语言:javascript
复制
from celery import group, chain

def cron_to_update_product(products):
    group_tasks = []
    for product in products:
        group_tasks.append(update_product_details.s(product.product_id))
    async_result = chain(
        group(group_tasks), update_status_through_callback.s()
    ).apply_async()
    print(
        "a task with id %s is created to update product details" % async_result.task_id
    )
代码分解

.s- 添加到任务称为signature

group(group_tasks)- 芹菜创建n产品数量,其中n产品数量为。所有这些任务将并发执行而不会相互阻塞。

chain(group(group_tasks), update_status_through_callback.s())- 顾名思义,任务是按顺序执行的。一旦组中的所有任务都完成,然后update_status_through_callback运行

apply_async- 运行任务

这里有一个关键点需要注意,函数update_status_through_callback应该grouped_result作为第一个参数。grouped_result将是所有分组任务的返回值列表。 例如,有 5 个组任务运​​行,其中 3 个失败。然后grouped_result将看起来像这样

代码语言:javascript
复制
[
    {"status": False, "message": "error in updating"},
    {"status": True, "message": "successfully updated"},
    {"status": False, "message": "error in updating"},
    {"status": True, "message": "successfully updated"},
    {"status": False, "message": "error in updating"},
]

最后,我们的update_status_through_callback样子是这样的

代码语言:javascript
复制
def update_status_through_callback(grouped_result):
    if not all([result["status"] for result in grouped_result]):
        return {"status": False,
                "message": "not all products are updated"
        }
    response = make_http_call()
    if not is_valid_response(rseponse):
        return {"status": False,
                "message": "error in making callback"
               }
    return {"status": True, "message": "updated status"}

在函数的第一行,我们检查是否所有组任务都已成功执行,因为我们应该只更新所有产品的状态。

任务路由

我们都使用像这样的简单命令来运行 celery celery worker -A proj_name。当项目的任务数量较少时,只运行一个工人规模。但是,考虑一下您正在从事电子商务项目的相同场景,您想要运行不同类型的报告。假设您只运行一个队列,很少有报告会花费很多时间(说出它们long_running_tasks),而很少有报告会花费更少的时间(说出它们short_running_tasks)。假设当你得到很多long_running_tasks使得队列填满并且short_running_tasks必须等到他们完成。这可能无法很好地扩展。因此,可扩展的解决方案是为每种报告类型创建单独的队列。但是这种方法也有一个问题。如果没有针对特定报告类型的任务,运行这些队列是一种资源浪费。因此,根据业务用例使用第一种方法还是第二种方法是一种权衡。

要根据报告类型运行多个队列,您需要使用此芹菜配置

代码语言:javascript
复制
CELERY_BROKER_URL = "redis://localhost:6379" # if your broker
# is different change this
CELERY_RESULT_BACKEND = "redis://localhost:6379" # change this
# if this is different for you
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TASK_DEFAULT_QUEUE = "default"
CELERY_TASK_DEFAULT_EXCHANGE = "default"
CELERY_TASK_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_TASK_DEFAULT_ROUTING_KEY = "task.default"

CELERY_TASK_ROUTES = {  
  "foo.tasks.report_type1_aggregator": {
      "queue": "report_type1_aggregator_queue"
  },
  "foo.tasks.report_type2_aggregator": {
      "queue": "report_type2_aggregator_queue"
  },
  "foo.tasks.report_type3_aggregator": {
      "queue": "report_type3_aggregator_queue"
  },
  "foo.tasks.report_type1_report_queue": {
      "queue": "report_type1_report_queue"
  },
  "foo.tasks.report_type2_report_queue": {
      "queue": "report_type2_report_queue"
  },
  "foo.tasks.report_type3_report_queue": {
      "queue": "report_type3_report_queue"
  }
}

我们为每个任务定义了路线并为其分配了一个队列。但是,我们还没有在 celery 中创建工人。我们可以使用以下命令创建工人

代码语言:javascript
复制
celery worker -A proj_name -O fair -Q {queue_name}
 -P gevent --autoscale=32,16 --loglevel=INFO 
 --logfile={queue_name}_celery.log

对我们定义的所有队列重复上述命令。

提示 :

不要运行许多命令,而是使用该celery multi实用程序。此处给出示例

现在,让我们创建一个默认工作人员

代码语言:javascript
复制
celery worker -A proj_name -O fair -Q default 
-P gevent --loglevel=INFO --logfile=celery.log

就是这样!当您运行任务时,它们将被路由到相应的队列。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-10-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 任务的分组和链接
    • 代码分解
    • 任务路由
      • 提示 :
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档