前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式任务管理系统 Celery 之一

分布式任务管理系统 Celery 之一

作者头像
用户1278550
发布2018-08-01 17:37:32
1.5K0
发布2018-08-01 17:37:32
举报
文章被收录于专栏:idbaidba

一 前言

开发自动化管理平台的过程中,有执行时间较长的任务比如安装基础软件,备份恢复;有定时执行的任务比如定期收集元数据,检查慢日志数量等等,我们可以自己开发一套任务系统,当然也可以依赖Celery 实现上述功能。

二 Celery 是什么?

2.1 概念

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度,支持异步执行任务。更令人欣喜的是常见的Python的web框架都能和Celery 耦合,给广大开发者带来极大的便利。Celery 是开源的,使用 BSD 许可证 授权。

2.2 原理

Celery 实现异步调用的原理核心其实是将任务执行单元 worker 和 任务派发单元 分开,从而达到异步的效果;

Celery将需要执行的任务发送到消息队列中,然后再由任务执行单元根据具体的配置(绑定到具体哪个队列,默认为defaults)从消息队列中获取任务执行,这样就实现了异步的效果。

2.3 架构

Celery 使用简洁的模块架构提供了完整的功能,上手容易,部署简单。主要包含消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store).我们使用一张图展示Celery运行机制。

task producer - 任务生产者

顾名思义就是发起调度任务的,然后交给任务队列去处理。简单的Python代码、耦合在Django/Flask Web 服务里请求任务比如调用备份或者调用初始化安装机器的任务,在程序里面调用Celery任务装饰的函数,产生任务并分发到任务队列处理的,我们都可以称之为任务生产者。

celery beat - 任务调度器

Celery beat 是 Celery 系统自带的任务生产者,它以独立进程的形式存在,该进程会读取配置文件的内容,周期性地将执行任务的请求发送给任务队列。需要注意的是在一个Celery系统中,只能存在一个 Celery beat 调度器。

broker - 任务代理

其实broker就是一个队列存储,是负责接收task producer发送的任务消息,存储到队列之后再进行调度,分发给任务消费方 (celery worker)。常见的broker有RabbitMQ、Redis 等。

celery worker - 任务消费方

Celery worker 就是任务的执行者,它负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。Celery worker 对应的就是操作系统中的一个进程。Celery 支持分布式部署和横向扩展,我们可以在多个节点增加 Celery worker 的数量来增加系统的高可用性。在分布式系统中,我们也可以在不同节点上分配执行不同任务的 Celery worker 来达到模块化的目的。

Result Stores/backend

存储Celery worker 执行任务之后的结果和状态信息,以供应用系统查询任务的状态信息。Celery 内置支持Django ORM,Redis,RabbitMQ 等方式来保存任务处理后的状态信息。

三 快速入门

3.1 安装

本例子使用redis作为存储任务消息的介质,需要提前安装redis 并启动相关服务。

用 pip 安装:

pip install -U Celery

用 easy_install 安装:

easy_install -U Celery

3.2 部署

我们先使用一个单一程序文件包含所有celery相关的配置作为例子。要使用celery,就需要先初始化一个celery实例,配置好broker和backend为redis。编写程序文件。目录结构

python/

tasks.py

client.py

tasks.py import time from celery import Celery app = Celery('tasks', broker='redis://localhost:6379/1', backend='redis://localhost:6379/1') @app.task def add(x, y): time.sleep(2) return x + y @app.task def mul(x, y): time.sleep(5) return x * y

在和tasks 同一层级执行

celery -A tasks worker -l info

这里需要说明的是

命令行执行celery worker -A app -l info时, app 必须可导入,app 可以为py模块或包,本例为tasks 。不管是包还是模块都必须正确指定Celery入口文件(如果为包则默认的入口文件名为 celery.py )的绝对导入名称(proj.celery),但是从工程上我们推荐在包的__init__.py 文件进行celey的初始化。

另起一个命令行 本例子使用ipython

In [5]: from tasks import add,mul In [6]: add.delay(3,3) Out[6]: <AsyncResult: 037e25c2-3801-4edb-9b4f-917187325f47> In [7]: add.delay(3,7) Out[7]: <AsyncResult: 844a9785-2419-4480-bf56-178c957a3d7b> In [8]: r=add.delay(6,10) In [10]: r.status Out[10]: u'SUCCESS' In [11]: r.result Out[11]: 16 In [14]: r.ready() Out[14]: True In [15]: r.get() Out[15]: 16

从例子可以看到,直接调用add 函数并未直接返回 6 ,10 这样的算术结果而是返回 AsyncResult 对象。程序中可以使用这个对象检查任务状态,等待任务执行完成,获取任务结果,如果任务失败,它会返回异常信息或者调用栈。

将AsyncResult赋值给r, 可以通过调用 r.get() 或者 r.result 取结任务的结果,r.status,r.ready()获取任务的执行状态。

我们在优化一下上面的调用方式,编写一段小程序 client.py ,做同步调用 当然我们也可以做异步调用。

#!/usr/bin/env python #-*- coding:utf-8 -*- import time from tasks import add,mul add_ret = add.delay(4, 4) mul_ret = mul.delay(5, 5) while not add_ret.ready(): time.sleep(1) print "waiting for task to be done " print 'add task done: {0}'.format(add_ret.get()) while not mul_ret.ready(): time.sleep(2) print "waiting for task to be done " print 'mul task done: {0}'.format(mul_ret.get())

执行 python client.py

celery 输出的log如下

因为tasks任务中add 和mul函数都设置了等待 sleep,可以看出调用 add_ret.ready() 的时候并未直接返回结果,而是等待了具体的时间之后才返回。真实项目中我们需要改写 client.py ,利用Celery的异步执行特性。

四 小结

本文浅显的介绍了celery的架构和如何使用。Celery并不是一个队列,而是一套任务管理平台,通过队列实现任务的异步功能。有计划开发自己独立运维平台的还没有使用过celery朋友可以尝试用起来。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-11-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档