首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >芹菜预取任务被困在其他任务之后

芹菜预取任务被困在其他任务之后
EN

Stack Overflow用户
提问于 2021-11-16 10:25:28
回答 1查看 389关注 0票数 0

我在ECS集群上遇到了一个问题,包括多个芹菜工人,当集群需要升级时。

一些背景:

  • I有一项任务可能要运行几个小时。ECS集群上的
  • 芹菜工人目前使用Flower根据队列深度进行缩放。每当队列深度大于1时,它就会扩大工作人员以可能接收更多任务。代理使用的
  • 是Redis。
  • I已将worker_prefetch_multiplier设置为1,每个工人的并发性为4.

问题定义:由于这些设置,每个工作人员都预先获取4个任务,然后填充队列深度。假设我们有一个工作人员在运行,它需要在队列深度填充到第9个任务的1之前调用8个任务。4个任务将处于启动状态,4个任务将处于接收状态。每当将工作节点的数量增加到2个时,只将第九个任务发送给该工作人员。但是,这意味着处于接收状态的4个任务在启动状态下被“卡住”了几个小时,这是不可取的。

调查的解决方案:

  • 在搜索芹菜文档(https://docs.celeryproject.org/en/stable/userguide/optimizing.html)中的解决方案时发现,禁用预取的唯一方法是在任务中使用acks_late=True。它确实解决了不预取任务的问题,但也会导致其他问题,如在新缩放的工作节点上复制任务,这肯定不是我想要的。
  • 也经常将工作人员的-O fair设置为解决方案,但似乎它仍然在接收状态下创建任务。

目前,我正在考虑一个小的复杂的解决这个问题的办法,所以我很高兴听到其他解决办法。目前提出的解决方案是将并发设置为-c 2 (而不是-c 4)。这意味着将在第一个工作节点上预取2个任务,并启动2个任务。所有其他任务都将在队列中结束,需要缩放事件。一旦ECS扩展到两个工作节点,我将把第一个工作器的并发性从2扩展到4,释放预取任务。

有什么想法/建议吗?

EN

回答 1

Stack Overflow用户

发布于 2021-11-17 08:51:04

我已经找到了一个解决这个问题的方法(在下面的文章中:https://github.com/celery/celery/issues/6500)。我将在这里为和我有同样问题的人提供完整的答案。

解决方案: @samdoolin提供的解决方案是将使用者的can_consume功能设置为一个功能,只有当保留的请求少于工人所能处理的(工人的并发性)时,才能使用消息。在我的例子中,这意味着如果已经有4个请求处于活动状态,它就不会使用请求。相反,任何请求都会累积到队列中,从而产生预期的行为。然后,我可以根据队列深度轻松地缩放容纳单个工人的ECS容器的数量。

实际上,这看起来类似于(再次感谢@samdoolin):

代码语言:javascript
运行
复制
class SingleTaskLoader(AppLoader):

    def on_worker_init(self):
        # called when the worker starts, before logging setup
        super().on_worker_init()

        """
        STEP 1:
        monkey patch kombu.transport.virtual.base.QoS.can_consume()
        to prefer to run a delegate function,
        instead of the builtin implementation.
        """

        import kombu.transport.virtual

        builtin_can_consume = kombu.transport.virtual.QoS.can_consume

        def can_consume(self):
            """
            monkey patch for kombu.transport.virtual.QoS.can_consume

            if self.delegate_can_consume exists, run it instead
            """
            if delegate := getattr(self, 'delegate_can_consume', False):
                return delegate()
            else:
                return builtin_can_consume(self)

        kombu.transport.virtual.QoS.can_consume = can_consume

        """
        STEP 2:
        add a bootstep to the celery Consumer blueprint
        to supply the delegate function above.
        """

        from celery import bootsteps
        from celery.worker import state as worker_state

        class Set_QoS_Delegate(bootsteps.StartStopStep):

            requires = {'celery.worker.consumer.tasks:Tasks'}

            def start(self, c):

                def can_consume():
                    """
                    delegate for QoS.can_consume

                    only fetch a message from the queue if the worker has
                    no other messages
                    """
                    # note: reserved_requests includes active_requests
                    return len(worker_state.reserved_requests) == 0

                # types...
                # c: celery.worker.consumer.consumer.Consumer
                # c.task_consumer: kombu.messaging.Consumer
                # c.task_consumer.channel: kombu.transport.virtual.Channel
                # c.task_consumer.channel.qos: kombu.transport.virtual.QoS
                c.task_consumer.channel.qos.delegate_can_consume = can_consume

        # add bootstep to Consumer blueprint
        self.app.steps['consumer'].add(Set_QoS_Delegate)


# Create a Celery application as normal with the custom loader and any required **kwargs
celery = Celery(loader=SingleTaskLoader, **kwargs)

然后,我们通过下面的行启动工人:

代码语言:javascript
运行
复制
celery -A proj worker -c 4 --prefetch-multiplier -1

确保您不会忘记--prefetch-multiplier -1选项,它完全禁止获取新请求。这将确保它使用的是can_consume猴子区。

现在,当芹菜应用程序启动,您请求6个任务时,4个任务将按预期执行,2个任务将以队列结束,而不是预取。这是在没有实际设置acks_late=True的情况下预期的行为。

还有最后一个我想做的笔记。根据芹菜的文档,在命令行中启动工作人员时,也应该可以将路径传递给SingleTaskLoader。如下所示:

代码语言:javascript
运行
复制
celery -A proj --loader path.to.SingleTaskLoader worker -c 4 --prefetch-multiplier -1

对我来说,不幸的是,这是行不通的。但是,它可以通过实际将其传递给构造函数来解决。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69987419

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档