我们希望使用芹菜来侦听sqs队列,并将事件处理为任务。
这是celeryconfig.py文件
from kombu import (
Exchange,
Queue
)
broker_transport = 'sqs'
broker_transport_options = {'region': 'us-east-1'}
worker_concurrency = 10
accept_content = ['application/json']
result_serializer = 'json'
content_encoding = 'utf-8'
task_serializer = 'json'
worker_enable_remote_control = False
worker_send_task_events = True
result_backend = None
task_queues = (
Queue('re.fifo', exchange=Exchange('consume', type='direct'), routing_key='consume'),
)
task_routes = {'consume': {'queue': 're.fifo'}}这是celery.py文件
from celery.utils.log import get_task_logger
from celery import Celery
app = Celery(__name__)
logger = get_task_logger(__name__)
@app.task(routing_key='consume', name="consume", bind=True, acks_late=True, ignore_result=True)
def consume(self, msg):
print('Message received')
logger.info('Message received')
# DO SOMETHING WITH THE RECEIVED MESSAGE
# print('this is the new message', msg)
return True我们正在使用aws推动sqs上的事件。
aws --endpoint-url http://localhost:9324 sqs send-message --queue-url http://localhost:9324/queue/re.fifo --message-group-id owais --message-deduplication-id test18 --message-body {\"test\":\"test\"}我们收到了有关芹菜工人的事件,但我们的消费任务并没有召唤,我们想称之为它。

对于来自SQS的事件,我们如何调用消费任务,如有任何帮助,将不胜感激。
发布于 2022-06-18 05:32:14
使用aws推送到SQS的消息不会被芹菜工人识别。您需要调用consume.delay(msg)来推送消息SQS,然后您的工作人员将能够识别它。
https://stackoverflow.com/questions/68918133
复制相似问题