前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何使用Celery和RabbitMQ设置任务队列

如何使用Celery和RabbitMQ设置任务队列

作者头像
沈唁
发布2018-09-21 11:56:00
4.8K0
发布2018-09-21 11:56:00
举报
文章被收录于专栏:沈唁志

Celery是一个Python任务队列系统,用于处理跨线程或网络节点的工作任务分配。它使异步任务管理变得容易。您的应用程序只需要将消息推送到像RabbitMQ这样的代理,Celery worker会弹出它们并安排任务执行。

Celery可以用于多种配置。最常见的用途是通过在分布在群集中的Celery worker上运行资源密集型任务来进行横向应用程序扩展,或者管理Web应用程序中的长异步任务,例如用户发布图像时生成缩略图。本指南将向您介绍Celery的安装和使用,其中包含使用Python 3,Celery 4.1.0和RabbitMQ将文件下载委派给Celery worker的示例应用程序。

开始之前

  1. 熟悉我们的入门指南并完成设置Linode主机名和时区的步骤。
  2. 本指南将sudo尽可能使用。完成“ 保护您的服务器 ”部分以创建标准用户帐户,加强SSH访问并删除不必要的网络服务。
  3. 更新您的系统: sudo apt update && sudo apt upgrade

注意本指南是为非root用户编写的。需要提升权限的命令带有前缀sudo。如果您不熟悉该sudo命令,请参阅“ 用户和组”指南。

安装Python 3环境

  1. 下载并安装Miniconda: curl -OL https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh bash Miniconda3-latest-Linux-x86_64.sh
  2. 在安装过程中,系统会多次提示您。查看条款和条件,并为每个提示选择“是”。
  3. 重新启动shell会话以使PATH的更改生效。
  4. 检查你的Python版本: python --version

安装Celery

Celery可从PyPI获得。最简单和推荐的方法是安装它pip。为简单起见,您可以进行系统范围的安装,或者如果您的系统上运行其他Python应用程序,则可以使用虚拟环境。最后一种方法基于每个项目安装库,并防止版本与其他应用程序冲突。

系统范围安装

如果主机不运行具有特定版本库要求的其他python应用程序,则选择系统范围的安装。使用以下命令安装Celery:

代码语言:javascript
复制
pip install celery

在Python虚拟环境中安装

如果您的主机上正在运行其他Python应用程序,并且您希望基于每个项目管理库,请使用虚拟环境安装。本指南将使用Anaconda,但Virtualenv也是一个不错的选择。

  1. 创建您的虚拟环境: conda create -n celeryenv
  2. 激活您的虚拟环境: source activate celeryenv 您的shell提示符将更改以指示您正在使用的环境
  3. 在虚拟环境中安装Celery: pip install celery

注意如果您使用虚拟环境,请不要忘记在处理项目时使用步骤3激活您的环境。本指南中的所有命令都假定已激活Celery虚拟环境。

安装RabbitMQ

  • 在Debian / Ubuntu上:
    • 安装RabbitMQ apt。以下命令将使用可接受的默认配置安装和启动RabbitMQ: sudo apt-get install rabbitmq-server
  • 在CentOS上:
    • 安装rabbitmq-server.noarch软件包,使服务在引导时启动并启动RabbitMQ服务器: sudo yum install rabbitmq-server.noarch systemctl enable rabbitmq-server systemctl start rabbitmq-server

    这将使用默认配置安装RabbitMQ。

编写Celery应用程序

Celery应用程序由两部分组成:

  • Workers是等待的RabbitMQ消息并执行任务。
  • 向RabbitMQ提交消息以触发任务执行的客户端,并最终在以后检索结果

任务在模块中定义,该模块将由服务端和客户端使用。worker将运行代码来执行任务,客户端将仅使用函数定义来公开它们并隐藏RabbitMQ发布复杂性。

  1. 创建一个目录downloaderApp来保存我们的新python模块,以及一个downloadedFiles存储下载文件的目录: mkdir ~/downloadedFiles ~/downloaderApp; cd ~/downloaderApp
  2. 创建一个downloaderApp.py将包含两个功能模块,download并且list,这将是异步任务。替换celeryBASEDIR与您的系统用户名路径。 〜/ downloaderApp / downloaderApp.py 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from celery import Celery import urllib.request import os # Where the downloaded files will be stored BASEDIR="/home/celery/downloadedFiles" # Create the app and set the broker location (RabbitMQ) app = Celery('downloaderApp', backend='rpc://', broker='pyamqp://guest@localhost//') @app.task def download(url, filename): """ Download a page and save it to the BASEDIR directory url: the url to download filename: the filename used to save the url in BASEDIR """ response = urllib.request.urlopen(url) data = response.read() with open(BASEDIR+"/"+filename,'wb') as file: file.write(data) file.close() @app.task def list(): """ Return an array of all downloaded files """ return os.listdir(BASEDIR)

所有的模块都发生在@app.task注释中。这告诉celery这个函数不会在客户端上运行,而是通过RabbitMQ发送给worker。所有Celery配置都发生在以下行中:

代码语言:javascript
复制
app = Celery('downloaderApp', backend='rpc://', broker='pyamqp://guest@localhost//')

这一行创建:

  • Celery应用程序命名 downloaderApp
  • broker本地主机上的A 将通过* 高级消息队列协议(AMQP)接受消息,该协议是RabbitMQ使用的协议
  • 一个响应backend,其中worker将存储任务的返回值,以便客户端可以在以后检索它(请记住任务执行是异步的)。如果省略backend,任务仍将运行,但返回值将丢失。rpc表示响应将以远程过程调用模式发送到RabbitMQ队列。

启动Workers

该命令celery worker用于启动Celery工作程序。该-A标志用于设置包含Celery应用程序的模块。worker将读取模块并使用Celery()调用中的参数连接到RabbitMQ 。

  1. 使用以下命令以调试模式启动worker: celery -A downloaderApp worker --loglevel=debug
  2. 打开另一个ssh会话来运行客户端(如果需要,不要忘记激活你的虚拟环境),转到你的模块文件夹并启动一个python shell: cd ~/downloaderApp python
  3. 在python shell中,调用delay()方法向RabbitMQ提交作业,然后使用该ready()函数确定任务是否完成: from downloaderApp import download,list r = download.delay('https://www.python.org/static/community_logos/python-logo-master-v3-TM.png', 'python-logo.png') r.ready()
  4. 退出python shell,检查是否已下载python徽标: ls ~/downloadedFiles
  5. 再次启动python shell并运行list任务。使用该get()函数获取结果: from downloaderApp import download,list r = list.delay() r.ready() r.get(timeout=1) 如果省略该timeout参数,客户端将等待任务以同步方式完成。这是不好的做法,应该避免。

启动Workers作为守护进程

在具有多个worker的生产环境中,应该对worker进行守护,以便在服务器启动时自动启动它们。

  1. 使用sudo,在中创建一个新的服务定义文件/etc/systemd/system/celeryd.service。根据您的实际用户和组名更改UserGroup属性: /etc/systemd/system/celeryd.service 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 [Unit] Description=Celery Service After=network.target [Service] Type=forking User=celery Group=celery EnvironmentFile=/etc/default/celeryd WorkingDirectory=/home/celery/downloaderApp ExecStart=/bin/sh -c '${CELERY_BIN} multi start ${CELERYD_NODES} \ -A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \ --logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}' ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait ${CELERYD_NODES} \ --pidfile=${CELERYD_PID_FILE}' ExecReload=/bin/sh -c '${CELERY_BIN} multi restart ${CELERYD_NODES} \ -A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \ --logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}' [Install] WantedBy=multi-user.target
  2. 创建/etc/default/celeryd配置文件: 在/ etc /默认/ celeryd 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # The names of the workers. This example create two workers CELERYD_NODES="worker1 worker2" # The name of the Celery App, should be the same as the python file # where the Celery tasks are defined CELERY_APP="downloaderApp" # Log and PID directories CELERYD_LOG_FILE="/var/log/celery/%n%I.log" CELERYD_PID_FILE="/var/run/celery/%n.pid" # Log level CELERYD_LOG_LEVEL=INFO # Path to celery binary, that is in your virtual environment CELERY_BIN=/home/celery/miniconda3/bin/celery
  3. 创建日志和pid目录: sudo mkdir /var/log/celery /var/run/celery sudo chown celery:celery /var/log/celery /var/run/celery
  4. 重新加载systemctl守护程序。每次更改服务定义文件时都应该运行此命令。 sudo systemctl daemon-reload
  5. 启用服务以在启动时启动: sudo systemctl enable celeryd
  6. 启动服务 sudo systemctl start celeryd
  7. 检查您的worker是否通过日志文件运行: cat /var/log/celery/worker1.log cat /var/log/celery/worker2.log
  8. 从目录中的python shell中向两个worker发送一些任务/home/celery/downloaderApp: from downloaderApp import download,list r1 = download.delay('https://www.linode.com/media/images/logos/standard/light/linode-logo_standard_light_large.png', 'linode-logo.png') r2 = list.delay() r2.get(timeout=1) 根据您输入命令的速度,list任务的worker可以在worker执行download任务之前完成,您可能无法在列表中看到Linode徽标。查看日志文件,如步骤7,您将看到哪个worker处理了每个任务。

监控您的Celery集群

celery二进制提供一些命令来监视工人和任务,远远超过浏览日志文件更方便:

  1. 使用status命令获取worker列表: celery -A downloaderApp status worker1@celery: OK worker2@celery: OK celery@celery: OK
  2. 使用inspect active命令查看worker正在执行的操作: celery -A downloaderApp inspect active -> worker1@celery: OK - empty - -> worker2@celery: OK - empty - -> celery@celery: OK - empty -
  3. 使用inspect stats命令获取有关worker的统计信息。它提供了大量信息,例如rusage密钥下的工作者资源使用情况,或total密钥下完成的总任务。 celery -A downloaderApp inspect stats

使用Flower Permalink监视Celery群集

Flower是一种基于Web的监视工具,可用于代替celery命令。

  1. 安装花: pip install wheel flower
  2. 如果您运行CentOS,则需要在Flower端口上打开防火墙(默认为5555)。如果您使用Debian,请跳过此步骤:
    1. 获取您当前的区域,通常是public: firewall-cmd --get-active-zones
    2. 打开端口5555.根据您的配置更改区域: sudo firewall-cmd --zone=public --add-port=5555/tcp --permanent
    3. 重新加载防火墙: sudo firewall-cmd --reload
  3. 使用Celery应用程序导航到该目录并启动Flower。5555是默认端口,但可以使用--port标志更改: cd /home/celery/downloaderApp celery -A downloaderApp flower --port=5555
  4. 将浏览器指向localhost:5555以查看仪表板:
花截图
花截图

注意如果Flower通过公共IP地址公开,请务必采取其他步骤通过反向代理保护此功能。

从其他语言启动Celery任务

Celery的易用性来自于@task将Celery方法添加到函数对象的装饰器。这种魔法不能用于每种编程语言,因此Celery提供了另外两种与Worker通信的方法:

  1. Webhooks:Flower提供了一个API,允许您通过REST HTTP查询与Celery进行交互。
  2. AMQP@task当您调用celery方法时,装饰器会向代理发送消息.delay()。有些语言提供了为您执行此任务的模块,包括NodeJS的 node-celery或PHP的 celery-php

您可以使用curl练习如何使用Flower API进行交互。

  1. 启动Flower,如果尚未运行: cd /home/celery/downloaderApp celery -A downloaderApp flower --port=5555
  2. 通过任务API提交下载: curl -X POST -d '{"args":["http://www.celeryproject.org/static/img/logo.png","celery-logo.png"]}' 'http://localhost:5555/api/task/async-apply/downloaderApp.download?refresh=True' {"task-id": "f29ce7dd-fb4c-4f29-9adc-f834250eb14e", "state": "PENDING"} 该/api/task/async-apply端点使得到的应用程序的任务之一异步调用,在这种情况下doanloaderApp.download。您可以使用进行同步通话/task/api/apply
  3. 在浏览器中打开Flower UI,看到该任务已被接受。

您可以在官方API文档中找到Flower API端点的完整列表。

更多信息

有关此主题的其他信息,您可能需要参考以下资源。虽然提供这些是希望它们有用,但请注意,我们无法保证外部托管材料的准确性或及时性。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 开始之前
  • 安装Python 3环境
  • 安装Celery
    • 系统范围安装
      • 在Python虚拟环境中安装
      • 安装RabbitMQ
      • 编写Celery应用程序
      • 启动Workers
      • 启动Workers作为守护进程
      • 监控您的Celery集群
      • 使用Flower Permalink监视Celery群集
      • 从其他语言启动Celery任务
      • 更多信息
      相关产品与服务
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档