我在ECS集群上遇到了一个问题,包括多个芹菜工人,当集群需要升级时。
一些背景:
worker_prefetch_multiplier
设置为1,每个工人的并发性为4.。
问题定义:由于这些设置,每个工作人员都预先获取4个任务,然后填充队列深度。假设我们有一个工作人员在运行,它需要在队列深度填充到第9个任务的1之前调用8个任务。4个任务将处于启动状态,4个任务将处于接收状态。每当将工作节点的数量增加到2个时,只将第九个任务发送给该工作人员。但是,这意味着处于接收状态的4个任务在启动状态下被“卡住”了几个小时,这是不可取的。
调查的解决方案:
acks_late=True
。它确实解决了不预取任务的问题,但也会导致其他问题,如在新缩放的工作节点上复制任务,这肯定不是我想要的。-O fair
设置为解决方案,但似乎它仍然在接收状态下创建任务。。
目前,我正在考虑一个小的复杂的解决这个问题的办法,所以我很高兴听到其他解决办法。目前提出的解决方案是将并发设置为-c 2
(而不是-c 4
)。这意味着将在第一个工作节点上预取2个任务,并启动2个任务。所有其他任务都将在队列中结束,需要缩放事件。一旦ECS扩展到两个工作节点,我将把第一个工作器的并发性从2扩展到4,释放预取任务。
有什么想法/建议吗?
发布于 2021-11-17 08:51:04
我已经找到了一个解决这个问题的方法(在下面的文章中:https://github.com/celery/celery/issues/6500)。我将在这里为和我有同样问题的人提供完整的答案。
解决方案: @samdoolin提供的解决方案是将使用者的can_consume
功能设置为一个功能,只有当保留的请求少于工人所能处理的(工人的并发性)时,才能使用消息。在我的例子中,这意味着如果已经有4个请求处于活动状态,它就不会使用请求。相反,任何请求都会累积到队列中,从而产生预期的行为。然后,我可以根据队列深度轻松地缩放容纳单个工人的ECS容器的数量。
实际上,这看起来类似于(再次感谢@samdoolin):
class SingleTaskLoader(AppLoader):
def on_worker_init(self):
# called when the worker starts, before logging setup
super().on_worker_init()
"""
STEP 1:
monkey patch kombu.transport.virtual.base.QoS.can_consume()
to prefer to run a delegate function,
instead of the builtin implementation.
"""
import kombu.transport.virtual
builtin_can_consume = kombu.transport.virtual.QoS.can_consume
def can_consume(self):
"""
monkey patch for kombu.transport.virtual.QoS.can_consume
if self.delegate_can_consume exists, run it instead
"""
if delegate := getattr(self, 'delegate_can_consume', False):
return delegate()
else:
return builtin_can_consume(self)
kombu.transport.virtual.QoS.can_consume = can_consume
"""
STEP 2:
add a bootstep to the celery Consumer blueprint
to supply the delegate function above.
"""
from celery import bootsteps
from celery.worker import state as worker_state
class Set_QoS_Delegate(bootsteps.StartStopStep):
requires = {'celery.worker.consumer.tasks:Tasks'}
def start(self, c):
def can_consume():
"""
delegate for QoS.can_consume
only fetch a message from the queue if the worker has
no other messages
"""
# note: reserved_requests includes active_requests
return len(worker_state.reserved_requests) == 0
# types...
# c: celery.worker.consumer.consumer.Consumer
# c.task_consumer: kombu.messaging.Consumer
# c.task_consumer.channel: kombu.transport.virtual.Channel
# c.task_consumer.channel.qos: kombu.transport.virtual.QoS
c.task_consumer.channel.qos.delegate_can_consume = can_consume
# add bootstep to Consumer blueprint
self.app.steps['consumer'].add(Set_QoS_Delegate)
# Create a Celery application as normal with the custom loader and any required **kwargs
celery = Celery(loader=SingleTaskLoader, **kwargs)
然后,我们通过下面的行启动工人:
celery -A proj worker -c 4 --prefetch-multiplier -1
确保您不会忘记--prefetch-multiplier -1
选项,它完全禁止获取新请求。这将确保它使用的是can_consume
猴子区。
现在,当芹菜应用程序启动,您请求6个任务时,4个任务将按预期执行,2个任务将以队列结束,而不是预取。这是在没有实际设置acks_late=True
的情况下预期的行为。
还有最后一个我想做的笔记。根据芹菜的文档,在命令行中启动工作人员时,也应该可以将路径传递给SingleTaskLoader
。如下所示:
celery -A proj --loader path.to.SingleTaskLoader worker -c 4 --prefetch-multiplier -1
对我来说,不幸的是,这是行不通的。但是,它可以通过实际将其传递给构造函数来解决。
https://stackoverflow.com/questions/69987419
复制相似问题