首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >芹菜弦在apply_async后不释放redis pubsub通道

芹菜弦在apply_async后不释放redis pubsub通道
EN

Stack Overflow用户
提问于 2018-05-31 13:20:58
回答 1查看 371关注 0票数 0

为了响应请求,我在django应用程序中从celery启动了一个chord。chord可以正确执行,但django从未释放pub-sub通道。杀死django服务器将释放该频道,然后该频道将从redis-cli pubsub channels中消失。

本地的

  • Celery 4.1.1或4.2.0rc4
  • Redis 4.0.9
  • Python本地版,1个celery worker,1个api server
  • Results在这种情况下无关紧要(但文档上说不要忽略它们)
  • 完整示例项目,地址:https://github.com/awbacker/celerychord-issue

在点击/api/start/并在运行celery的选项卡中查看任务完成后,我看到还有5个通道。杀死django会移除通道,杀死芹菜工人对通道没有影响。

redis-cli pubsub channels
1) "celery-task-meta-chord-lphsmq-chunk-4-14"
2) "celery-task-meta-chord-lphsmq-chunk-2-12"
3) "celery-task-meta-chord-lphsmq-chunk-3-13"
4) "celery-task-meta-chord-lphsmq-chunk-1-11"
5) "celery-task-meta-chord-lphsmq-chunk-0-10"

我看到当一切正常时,通道仍然存在,所以不会抛出错误。

有没有人看到我做错了什么?我知道芹菜中报告了一些问题,但我不确定这是不是从他们那里得出的:

代码:

# --- endpoint.py -------------------------------------------
chord_key = get_random_string(6, string.ascii_lowercase)
all_tasks = celery.chord(
    task_id="chord-%s" % chord_key,
    header=celery.group(
        tasks.process_chunk.subtask(args=(x,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
        for i, x in enumerate(range(10, 15))
    ),
    # immutable = ignore results from parent
    body=celery.chain(
        tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
        tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
    )
)
result = all_tasks.apply_async()
return Response(data=dict(chord_key=chord_key, result=repr(result)))

# --- tasks.py ----------------------------------------------
@celery_app.task(bind=True, ignore_result=False)
def process_chunk(self, x):
    logging.error(" ~ executing process-chunk: %s" % x)
    return x * 2


@celery_app.task(bind=True, ignore_result=False)
def post_step_1(self, y):
    logging.error(" ~ executing post-step-1")
    return y * 3


@celery_app.task(bind=True, ignore_result=False)
def post_step_2(self, z):
    logging.error(" ~ executing post-step-2")
    return z * 5
EN

回答 1

Stack Overflow用户

发布于 2018-05-31 17:14:11

你的Chord看起来很复杂,也许这就是为什么芹菜很难实现的原因,我建议你自己实现chord逻辑,它不是很复杂。试试这个吧。我基本上是在等待使用chord机制安装的任务

# --- endpoint.py ------------------------------------------- 
chain_tasks = celery.chain(
            tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
            tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True)
            .apply_async()
chain_result= chain_tasks.get() // WAIT TO FINISH

group_task = celery.group(tasks.process_chunk.subtask(args=(chain_result,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
            for i, x in enumerate(range(10, 15)).apply_async()
group_result = group_task.get()
return Response(data=dict(chord_key=chord_key, result=repr(group_result)))

我不确定这是否是你想要达到的效果,但我认为只要稍作调整,它就会奏效。祝好运。

票数 -2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50617142

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档