消息在短时间内被发布到队列中:~10条消息/秒,然后几分钟内没有消息。消费者有时会花费30秒来处理一条消息。我的简单消费者代码是:
import pika, os, time
url = os.environ.get('CLOUDAMQP_URL')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()
def callback(ch, method, properties, body):
print("Received " + str(body), method, properties)
# ... long task that is equivalent to:
time.sleep(30)
queue_name = 'test-queue'
channel.queue_declare(queue=queue_name, durable=True, exclusive=False, auto_delete=False)
channel.basic_consume(queue_name, callback, auto_ack=True)
channel.start_consuming()
connection.close()
有时,我会看到以下行为:
pika.exceptions.StreamLostError: Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')
(以下为全面回溯)
我通过手动禁用auto_ack=True
和破坏消息来解决我的问题(见下文)。
是否有其他方法来解决这个问题?发生EOF异常是因为CloudAMQP/ RabbitMQ服务器没有及时获得心跳,并关闭了连接吗?或者这是一个内部超时的皮卡..?谢谢!
回溯:
Traceback (most recent call last):
File "/app/app.py", line 146, in <module>
pika_obj['pika_channel'].start_consuming()
File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1866, in start_consuming
self._process_data_events(time_limit=None)
File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2027, in _process_data_events
self.connection.process_data_events(time_limit=time_limit)
File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 825, in process_data_events
self._flush_output(common_terminator)
File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')
发布于 2021-12-09 23:07:00
我能够通过引入一个简单的更改来修复上面的代码:在处理每一条消息之后设置auto_ack=False
和手动调用basic_ack
:
url = os.environ.get('CLOUDAMQP_URL')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()
def callback(ch, method, properties, body):
print("Received " + str(body), method, properties)
# ... long task that is equivalent to:
time.sleep(30)
# ack the message manually
ch.basic_ack(delivery_tag=method.delivery_tag)
queue_name = 'test-queue'
channel.queue_declare(queue=queue_name, durable=True, exclusive=False, auto_delete=False)
channel.basic_consume(queue_name, callback, auto_ack=False)
channel.start_consuming()
connection.close()
https://stackoverflow.com/questions/70297858
复制相似问题