为了响应请求,我在django
应用程序中从celery启动了一个chord
。chord可以正确执行,但django从未释放pub-sub通道。杀死django服务器将释放该频道,然后该频道将从redis-cli pubsub channels
中消失。
本地的
在点击/api/start/
并在运行celery的选项卡中查看任务完成后,我看到还有5个通道。杀死django会移除通道,杀死芹菜工人对通道没有影响。
redis-cli pubsub channels
1) "celery-task-meta-chord-lphsmq-chunk-4-14"
2) "celery-task-meta-chord-lphsmq-chunk-2-12"
3) "celery-task-meta-chord-lphsmq-chunk-3-13"
4) "celery-task-meta-chord-lphsmq-chunk-1-11"
5) "celery-task-meta-chord-lphsmq-chunk-0-10"
我看到当一切正常时,通道仍然存在,所以不会抛出错误。
有没有人看到我做错了什么?我知道芹菜中报告了一些问题,但我不确定这是不是从他们那里得出的:
代码:
# --- endpoint.py -------------------------------------------
chord_key = get_random_string(6, string.ascii_lowercase)
all_tasks = celery.chord(
task_id="chord-%s" % chord_key,
header=celery.group(
tasks.process_chunk.subtask(args=(x,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
for i, x in enumerate(range(10, 15))
),
# immutable = ignore results from parent
body=celery.chain(
tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
)
)
result = all_tasks.apply_async()
return Response(data=dict(chord_key=chord_key, result=repr(result)))
# --- tasks.py ----------------------------------------------
@celery_app.task(bind=True, ignore_result=False)
def process_chunk(self, x):
logging.error(" ~ executing process-chunk: %s" % x)
return x * 2
@celery_app.task(bind=True, ignore_result=False)
def post_step_1(self, y):
logging.error(" ~ executing post-step-1")
return y * 3
@celery_app.task(bind=True, ignore_result=False)
def post_step_2(self, z):
logging.error(" ~ executing post-step-2")
return z * 5
发布于 2018-05-31 17:14:11
你的Chord看起来很复杂,也许这就是为什么芹菜很难实现的原因,我建议你自己实现chord逻辑,它不是很复杂。试试这个吧。我基本上是在等待使用chord机制安装的任务
# --- endpoint.py -------------------------------------------
chain_tasks = celery.chain(
tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True)
.apply_async()
chain_result= chain_tasks.get() // WAIT TO FINISH
group_task = celery.group(tasks.process_chunk.subtask(args=(chain_result,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
for i, x in enumerate(range(10, 15)).apply_async()
group_result = group_task.get()
return Response(data=dict(chord_key=chord_key, result=repr(group_result)))
我不确定这是否是你想要达到的效果,但我认为只要稍作调整,它就会奏效。祝好运。
https://stackoverflow.com/questions/50617142
复制相似问题