最近一个处理程序又遇到 pika.exceptions.ConnectionClosed 这个问题,
查阅资料,https://stackoverflow.com/questions/37321089/rabbitmq-pika-exceptions-connectionclosed/37528066
这里面给说是因为主进程长时间等待,rabbitmq没有得到响应,就关闭了这个链接,所以根据里面给出的方法,定期调用
connection.process_data_events()
来查看进程完成状态和告知mq保持链接,pika.exceptions.ConnectionClosed这异常可能产生的原因有很多,最好去查看rabbitmq的日志,以做具体的解决方法,另外这里有一些异常的例子供参考:
https://programtalk.com/python-examples/pika.exceptions.ConnectionClosed/
try:
self.channel.start_consuming() # blocking call
except pika.exceptions.ConnectionClosed: # when connection is lost, e.g. rabbitmq not running
self.connect()
self.clear_message_queue() #could this make problems if the manager replies too fast?
# rabbitmq建立连接
credentials = pika.PlainCredentials(username=RMQ_username, password=RMQ_password)
self.parameters = pika.ConnectionParameters(host=RMQ_host, port=RMQ_port, credentials=credentials,heartbeat=0)
self.connection = pika.BlockingConnection(self.parameters)
try:
channel = self.connection.channel()
except pika.exceptions.ConnectionClosed:
self.connection = pika.BlockingConnection(self.parameters)
channel = self.connection.channel()
except Exception as e:
self.connection = pika.BlockingConnection(self.parameters)
channel = self.connection.channel()
# 创建broker
channel.exchange_declare(exchange=exchange, exchange_type='direct', durable=True)
# 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue=queue, durable=True)
# 把队列和中间人绑定
channel.queue_bind(exchange=exchange, queue=queue, routing_key=routing_key)
# 交换机; 队列名,写明将消息发往哪个队列; 消息内容
# routing_key在使用匿名交换机的时候才需要指定,表示发送到哪个队列
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=json.dumps(dict(item),sort_keys=True,indent =31,ensure_ascii=False))
使用 pika 连接 rabbitmq 在执行长时间任务时,会出现下面的错误:
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')")
原因是 rabbitmq 会关闭长时间没有通信的 pika 连接。
网上说可以通过将 heartbeat 设为 0,关闭 rabbitmq 的心跳检测,但我试了下没有效果,看来还是应该手动执行 heartbeat。
看文档发现 pika 的 connection 有 process_data_events 方法,类似 heartbeat 操作,可以保持与 rabbitmq 的通信。在执行长时间任务时,定时调用 process_data_events 方法,就不会丢失连接。
实现 在 consumer 程序的子线程中执行长时间的任务,在主线程中定时检测子线程是否结束,如果没结束,则调用 process_data_events 方法。代码如下:
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=host,
port=port
)
)
channel = connection.channel()
channel.exchange_declare(
exchange=exchange_name,
exchange_type="topic"
)
channel.queue_declare(
queue=queue_name
)
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
routing_key=routing_key
)
def consume_message(ch, method, properties, body):
logger.info('receive new message')
message_string = body.decode('utf-8')
message = json.loads(message_string)
message_thread = threading.Thread(target=process_message, args=(message, config))
message_thread.start()
while message_thread.is_alive():
time.sleep(10)
connection.process_data_events()
# logger.info("waiting for message thread...")
logger.info("message thread done")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
consume_message,
queue=queue_name,
no_ack=True
)
try:
logger.info("starting receiving message...")
channel.start_consuming()
except KeyboardInterrupt as e:
logger.info(e)
finally:
logger.info("Warm shutdown...")
connection.close()
logger.info("Warm shutdown...Done")