我们正在尝试建立一个基本的定向队列系统,其中生产者将生成多个任务,一个或多个消费者将一次获取一个任务,对其进行处理,并确认消息。
问题是,处理过程可能需要10-20分钟,而我们当时没有响应消息,导致服务器断开与我们的连接。
下面是我们的消费者的一些伪代码:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
在第一个任务完成之后,在BlockingConnection内部的某个地方会抛出一个异常,抱怨套接字被重置。此外,RabbitMQ日志显示消费者因为没有及时响应而被断开连接(为什么它重置连接而不是发送FIN很奇怪,但我们不会担心这一点)。
我们搜索了很多,因为我们相信这是RabbitMQ的正常用例(有许多长时间运行的任务,应该在许多消费者之间分配),但似乎没有其他人真的有这个问题。最后,我们偶然发现了一个线程,其中建议使用心跳并在单独的线程中生成long_running_task()
。
所以代码变成了:
#!/usr/bin/env python
import pika
import time
import threading
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost',
heartbeat_interval=20))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def thread_func(ch, method, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
def callback(ch, method, properties, body):
threading.Thread(target=thread_func, args=(ch, method, body)).start()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
这似乎是可行的,但它非常混乱。我们确定ch
对象是线程安全的吗?此外,假设long_running_task()
正在使用该连接参数将一个任务添加到一个新队列中(即,这个长流程的第一部分已经完成,让我们将任务发送到第二部分)。因此,线程使用的是connection
对象。这个线程安全吗?
更重要的是,做这件事的首选方法是什么?我觉得这非常混乱,而且可能不是线程安全的,所以我们可能做得不对。谢谢!
发布于 2017-08-16 15:09:32
不要禁用心跳。
最好的解决方案是在一个单独的线程中运行该任务,并将prefetch_count
设置为1
,这样使用者就可以使用下面的channel.basic_qos(prefetch_count=1)
只得到一条未确认的消息
https://stackoverflow.com/questions/14572020
复制相似问题