前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pika.exceptions.ConnectionClosed 问题

pika.exceptions.ConnectionClosed 问题

作者头像
周小董
发布2019-03-25 11:37:18
2.7K0
发布2019-03-25 11:37:18
举报

最近一个处理程序又遇到 pika.exceptions.ConnectionClosed 这个问题,

image.png
image.png

查阅资料,https://stackoverflow.com/questions/37321089/rabbitmq-pika-exceptions-connectionclosed/37528066

这里面给说是因为主进程长时间等待,rabbitmq没有得到响应,就关闭了这个链接,所以根据里面给出的方法,定期调用

connection.process_data_events()

来查看进程完成状态和告知mq保持链接,pika.exceptions.ConnectionClosed这异常可能产生的原因有很多,最好去查看rabbitmq的日志,以做具体的解决方法,另外这里有一些异常的例子供参考:

https://programtalk.com/python-examples/pika.exceptions.ConnectionClosed/

        try:
            self.channel.start_consuming() # blocking call
        except pika.exceptions.ConnectionClosed: # when connection is lost, e.g. rabbitmq not running
            self.connect()
            self.clear_message_queue() #could this make problems if the manager replies too fast?
            # rabbitmq建立连接
            credentials = pika.PlainCredentials(username=RMQ_username, password=RMQ_password)
            self.parameters = pika.ConnectionParameters(host=RMQ_host, port=RMQ_port, credentials=credentials,heartbeat=0)
            self.connection = pika.BlockingConnection(self.parameters)
            try:
                channel = self.connection.channel()
            except pika.exceptions.ConnectionClosed:
                self.connection = pika.BlockingConnection(self.parameters)
                channel = self.connection.channel()
            except Exception as e:
                self.connection = pika.BlockingConnection(self.parameters)
                channel = self.connection.channel()
            # 创建broker
            channel.exchange_declare(exchange=exchange, exchange_type='direct', durable=True)
            # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
            channel.queue_declare(queue=queue, durable=True)
            # 把队列和中间人绑定
            channel.queue_bind(exchange=exchange, queue=queue, routing_key=routing_key)
            # 交换机; 队列名,写明将消息发往哪个队列; 消息内容
            # routing_key在使用匿名交换机的时候才需要指定,表示发送到哪个队列
            channel.basic_publish(exchange=exchange, routing_key=routing_key, body=json.dumps(dict(item),sort_keys=True,indent =31,ensure_ascii=False))

使用 pika 连接 rabbitmq 在执行长时间任务时,会出现下面的错误:

pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')")

原因是 rabbitmq 会关闭长时间没有通信的 pika 连接。

网上说可以通过将 heartbeat 设为 0,关闭 rabbitmq 的心跳检测,但我试了下没有效果,看来还是应该手动执行 heartbeat。

看文档发现 pika 的 connection 有 process_data_events 方法,类似 heartbeat 操作,可以保持与 rabbitmq 的通信。在执行长时间任务时,定时调用 process_data_events 方法,就不会丢失连接。

实现 在 consumer 程序的子线程中执行长时间的任务,在主线程中定时检测子线程是否结束,如果没结束,则调用 process_data_events 方法。代码如下:

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host=host,
        port=port
    )
)
 
channel = connection.channel()
channel.exchange_declare(
    exchange=exchange_name,
    exchange_type="topic"
)
channel.queue_declare(
    queue=queue_name
)
channel.queue_bind(
    exchange=exchange_name,
    queue=queue_name,
    routing_key=routing_key
)
 
def consume_message(ch, method, properties, body):
    logger.info('receive new message')
    message_string = body.decode('utf-8')
    message = json.loads(message_string)
    message_thread = threading.Thread(target=process_message, args=(message, config))
    message_thread.start()
    while message_thread.is_alive():
        time.sleep(10)
        connection.process_data_events()
        # logger.info("waiting for message thread...")
    logger.info("message thread done")
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
    consume_message,
    queue=queue_name,
    no_ack=True
)
 
try:
    logger.info("starting receiving message...")
    channel.start_consuming()
except KeyboardInterrupt as e:
    logger.info(e)
finally:
    logger.info("Warm shutdown...")
    connection.close()
    logger.info("Warm shutdown...Done")

参考:https://windrocblog.sinaapp.com/?p=2101

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年07月14日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档