Celery是一个Python任务队列系统,用于处理跨线程或网络节点的工作任务分配。它使异步任务管理变得容易。您的应用程序只需要将消息推送到像RabbitMQ这样的代理,Celery worker会弹出它们并安排任务执行。
Celery可以用于多种配置。最常见的用途是通过在分布在群集中的Celery worker上运行资源密集型任务来进行横向应用程序扩展,或者管理Web应用程序中的长异步任务,例如用户发布图像时生成缩略图。本指南将向您介绍Celery的安装和使用,其中包含使用Python 3,Celery 4.1.0和RabbitMQ将文件下载委派给Celery worker的示例应用程序。
sudo
尽可能使用。完成“ 保护您的服务器 ”部分以创建标准用户帐户,加强SSH访问并删除不必要的网络服务。注意本指南是为非root用户编写的。需要提升权限的命令带有前缀
sudo
。如果您不熟悉该sudo
命令,请参阅“ 用户和组”指南。
Celery可从PyPI获得。最简单和推荐的方法是安装它pip
。为简单起见,您可以进行系统范围的安装,或者如果您的系统上运行其他Python应用程序,则可以使用虚拟环境。最后一种方法基于每个项目安装库,并防止版本与其他应用程序冲突。
如果主机不运行具有特定版本库要求的其他python应用程序,则选择系统范围的安装。使用以下命令安装Celery:
pip install celery
如果您的主机上正在运行其他Python应用程序,并且您希望基于每个项目管理库,请使用虚拟环境安装。本指南将使用Anaconda,但Virtualenv也是一个不错的选择。
注意如果您使用虚拟环境,请不要忘记在处理项目时使用步骤3激活您的环境。本指南中的所有命令都假定已激活Celery虚拟环境。
apt
。以下命令将使用可接受的默认配置安装和启动RabbitMQ:
sudo apt-get install rabbitmq-serverrabbitmq-server.noarch
软件包,使服务在引导时启动并启动RabbitMQ服务器:
sudo yum install rabbitmq-server.noarch systemctl enable rabbitmq-server systemctl start rabbitmq-server这将使用默认配置安装RabbitMQ。
Celery应用程序由两部分组成:
任务在模块中定义,该模块将由服务端和客户端使用。worker将运行代码来执行任务,客户端将仅使用函数定义来公开它们并隐藏RabbitMQ发布复杂性。
downloaderApp
来保存我们的新python模块,以及一个downloadedFiles
存储下载文件的目录:
mkdir ~/downloadedFiles ~/downloaderApp; cd ~/downloaderAppdownloaderApp.py
将包含两个功能模块,download
并且list
,这将是异步任务。替换celery
在BASEDIR
与您的系统用户名路径。
〜/ downloaderApp / downloaderApp.py 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
from celery import Celery import urllib.request import os # Where the downloaded files will be stored BASEDIR="/home/celery/downloadedFiles" # Create the app and set the broker location (RabbitMQ) app = Celery('downloaderApp', backend='rpc://', broker='pyamqp://guest@localhost//') @app.task def download(url, filename): """ Download a page and save it to the BASEDIR directory url: the url to download filename: the filename used to save the url in BASEDIR """ response = urllib.request.urlopen(url) data = response.read() with open(BASEDIR+"/"+filename,'wb') as file: file.write(data) file.close() @app.task def list(): """ Return an array of all downloaded files """ return os.listdir(BASEDIR)所有的模块都发生在@app.task
注释中。这告诉celery这个函数不会在客户端上运行,而是通过RabbitMQ发送给worker。所有Celery配置都发生在以下行中:
app = Celery('downloaderApp', backend='rpc://', broker='pyamqp://guest@localhost//')
这一行创建:
downloaderApp
broker
本地主机上的A 将通过* 高级消息队列协议(AMQP)接受消息,该协议是RabbitMQ使用的协议backend
,其中worker将存储任务的返回值,以便客户端可以在以后检索它(请记住任务执行是异步的)。如果省略backend
,任务仍将运行,但返回值将丢失。rpc
表示响应将以远程过程调用模式发送到RabbitMQ队列。该命令celery worker
用于启动Celery工作程序。该-A
标志用于设置包含Celery应用程序的模块。worker将读取模块并使用Celery()
调用中的参数连接到RabbitMQ 。
delay()
方法向RabbitMQ提交作业,然后使用该ready()
函数确定任务是否完成:
from downloaderApp import download,list r = download.delay('https://www.python.org/static/community_logos/python-logo-master-v3-TM.png', 'python-logo.png') r.ready()list
任务。使用该get()
函数获取结果:
from downloaderApp import download,list r = list.delay() r.ready() r.get(timeout=1)
如果省略该timeout
参数,客户端将等待任务以同步方式完成。这是不好的做法,应该避免。在具有多个worker的生产环境中,应该对worker进行守护,以便在服务器启动时自动启动它们。
sudo
,在中创建一个新的服务定义文件/etc/systemd/system/celeryd.service
。根据您的实际用户和组名更改User
和Group
属性:
/etc/systemd/system/celeryd.service 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
[Unit] Description=Celery Service After=network.target [Service] Type=forking User=celery Group=celery EnvironmentFile=/etc/default/celeryd WorkingDirectory=/home/celery/downloaderApp ExecStart=/bin/sh -c '${CELERY_BIN} multi start ${CELERYD_NODES} \ -A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \ --logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}' ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait ${CELERYD_NODES} \ --pidfile=${CELERYD_PID_FILE}' ExecReload=/bin/sh -c '${CELERY_BIN} multi restart ${CELERYD_NODES} \ -A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \ --logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}' [Install] WantedBy=multi-user.target/etc/default/celeryd
配置文件:
在/ etc /默认/ celeryd 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# The names of the workers. This example create two workers CELERYD_NODES="worker1 worker2" # The name of the Celery App, should be the same as the python file # where the Celery tasks are defined CELERY_APP="downloaderApp" # Log and PID directories CELERYD_LOG_FILE="/var/log/celery/%n%I.log" CELERYD_PID_FILE="/var/run/celery/%n.pid" # Log level CELERYD_LOG_LEVEL=INFO # Path to celery binary, that is in your virtual environment CELERY_BIN=/home/celery/miniconda3/bin/celery/home/celery/downloaderApp
:
from downloaderApp import download,list r1 = download.delay('https://www.linode.com/media/images/logos/standard/light/linode-logo_standard_light_large.png', 'linode-logo.png') r2 = list.delay() r2.get(timeout=1)
根据您输入命令的速度,list
任务的worker可以在worker执行download
任务之前完成,您可能无法在列表中看到Linode徽标。查看日志文件,如步骤7,您将看到哪个worker处理了每个任务。该celery
二进制提供一些命令来监视工人和任务,远远超过浏览日志文件更方便:
rusage
密钥下的工作者资源使用情况,或total
密钥下完成的总任务。
celery -A downloaderApp inspect statsFlower是一种基于Web的监视工具,可用于代替celery
命令。
public
:
firewall-cmd --get-active-zones--port
标志更改:
cd /home/celery/downloaderApp celery -A downloaderApp flower --port=5555localhost:5555
以查看仪表板:
注意如果Flower通过公共IP地址公开,请务必采取其他步骤通过反向代理保护此功能。
Celery的易用性来自于@task
将Celery方法添加到函数对象的装饰器。这种魔法不能用于每种编程语言,因此Celery提供了另外两种与Worker通信的方法:
@task
当您调用celery方法时,装饰器会向代理发送消息.delay()
。有些语言提供了为您执行此任务的模块,包括NodeJS的 node-celery或PHP的 celery-php。您可以使用curl
练习如何使用Flower API进行交互。
/api/task/async-apply
端点使得到的应用程序的任务之一异步调用,在这种情况下doanloaderApp.download
。您可以使用进行同步通话/task/api/apply
。您可以在官方API文档中找到Flower API端点的完整列表。
有关此主题的其他信息,您可能需要参考以下资源。虽然提供这些是希望它们有用,但请注意,我们无法保证外部托管材料的准确性或及时性。