我正在运行一个烧瓶应用程序,旨在接收零星的,但繁重的,工作。我实现了一系列的码头容器运行烧瓶应用程序,芹菜,红(作为经纪人)和memcached (作为后端)。我使用芹菜将处理重量分成几个块,然后使用get()检索所有结果:
# Multithreading
jobs = group(processing_fn.s(c) for c in chunks)
result = jobs.apply_async()
while not result.ready() :
time.sleep(30)
resultset = result.get() 虽然这确实工作迅速和良好,但我在订购芹菜释放用于存储任务结果的RAM时遇到了麻烦。最终导致服务器耗尽内存,必须重新启动,这远远不是最佳的.。
我尝试对结果集(甚至对结果集中的每个结果)使用.forget:
result = result.get()
result.forget()
...
resultset = result.get()
for r in result :
r.forget()然而,这些都没有释放记忆..。有什么想法吗?
这里是如何实例化芹菜应用程序:
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker = "redis://redis:6379/",
backend = "cache+memcached://memcached:11211"
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
...
celery = make_celery(app)发布于 2020-06-16 03:33:06
默认情况下,芹菜将任何任务的结果存储1天:
参见:您可以通过: CELERY_TASK_RESULT_EXPIRES或https://docs.celeryproject.org/en/latest/userguide/configuration.html#result-expires对其进行调整。
https://stackoverflow.com/questions/62399561
复制相似问题