我正在开发一个小型但计算密集型的Python应用程序。计算密集型的工作可以被分成几个可以并发执行的部分。我正在尝试确定一个合适的堆栈来实现这一点。
目前,我正计划在Apache2+WSGI上使用一个带有芹菜的Flask应用程序作为任务队列。
在下面的示例中,如果有3个或更多的工作线程可用,a_long_process()、another_long_process()和yet_another_long_process()是否会同时执行?当进程正在执行时,Flask应用程序会被阻止吗?
在Flask应用程序中:
@myapp.route('/foo')
def bar():
task_1 = a_long_process.delay(x, y)
task_1_result = task_1.get(timeout=1)
task_2 = another_long_process.delay(x, y)
task_2_result = task_2.get(timeout=1)
task_3 = yet_another_long_process.delay(x, y)
task_3_result = task_3.get(timeout=1)
return task_1 + task_2 + task_3tasks.py:
from celery import Celery
celery = Celery('tasks', broker="amqp://guest@localhost//", backend="amqp://")
@celery.task
def a_long_process(x, y):
return something
@celery.task
def another_long_process(x, y):
return something_else
@celery.task
def yet_another_long_process(x, y):
return a_third_thing发布于 2013-01-30 01:28:04
您应该更改您的代码,以便工作人员可以并行工作:
@myapp.route('/foo')
def bar():
# start tasks
task_1 = a_long_process.delay(x, y)
task_2 = another_long_process.delay(x, y)
task_3 = yet_another_long_process.delay(x, y)
# fetch results
try:
task_1_result = task_1.get(timeout=1)
task_2_result = task_2.get(timeout=1)
task_3_result = task_3.get(timeout=1)
except TimeoutError:
# Handle this or don't specify a timeout.
raise
# combine results
return task_1 + task_2 + task_3此代码将一直阻塞,直到所有结果可用(或到达超时时间)。
在进程执行时,Flask应用程序是否会被阻止?
这段代码只会阻塞你的WSGI容器中的一个worker。整个站点是否没有响应取决于您正在使用的WSGI容器。(例如Apache + mod_wsgi、uWSGI、gunicorn等)大多数WSGI容器都会产生多个worker,因此在您的代码等待任务结果时,只有一个worker会被阻塞。
对于这类应用程序,我推荐使用gevent,它为每个请求生成一个单独的greenlet,而且非常轻量级。
发布于 2013-01-30 01:19:32
根据result.get()的文档,它会等到结果准备好后才返回,所以通常它实际上是阻塞的。但是,由于您使用的是timeout=1,因此如果任务花费的时间超过1秒,则对get()的调用将引发TimeoutError。
默认情况下,芹菜工作进程从设置为equal to the number of CPUs available的并发级别开始。并发级别似乎决定了可用于处理任务的线程数量。因此,使用并发级别的>= 3,芹菜工人似乎应该能够并发处理这么多任务(也许更有芹菜专业知识的人可以验证这一点?)。
发布于 2014-07-17 20:40:04
使用芹菜画布的Group功能:
组原语是一个签名,它接受应该并行应用的任务列表。
以下是文档中提供的示例:
from celery import group
from proj.tasks import add
g = group(add.s(2, 2), add.s(4, 4))
res = g()
res.get()它输出[4, 8]。
https://stackoverflow.com/questions/14588253
复制相似问题