我尝试使用RabbitMQ发送消息和接收消息。我没有计算机科学背景,我用的术语不太准确。
我试图复制教程文件:当我提交html表单时,我的python脚本(cgi)将消息提交到队列中。
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = PN
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
connection.close()
我的接收器在运行:
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received Project %r" % body)
#ch.basic_ack(delivery_tag = method.delivery_tag)
if not (os.path.isfile(js_path)):
print (' [*] ERROR files missing ')
#ch.basic_ack(delivery_tag = method.delivery_tag)
return
p= subprocess.Popen(run a subprocess here)
p.wait()
print (' [*] Temporary Files removed')
print(" [*] Waiting for messages. To exit press CTRL+C")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='task_queue',no_ack=True)
channel.start_consuming()
它管理大部分时间,但由于以下错误随机崩溃:
回溯(最近一次调用):文件"Receive5.py",第139行,在channel.start_consuming() File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py",第1681行,在start_consuming self.connection.process_data_events(time_limit=None) "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py",第647行在process_data_events self._flush_output(common_terminator)文件第426行中,在_flush_output line exceptions.ConnectionClosed() pika.exceptions.ConnectionClosed中
发布于 2016-05-30 14:25:48
这是因为您让主线程等待,并且因为这个pika无法处理传入的消息;在这种情况下,它无法响应心跳,直到子进程完成。这导致RabbitMQ认为客户端已经死了,并强制断开连接。
如果您想让它与心跳(这是推荐的)一起工作,则需要定期调用connection.process_data_events
。这可以通过添加一个循环来完成,该循环检查线程是否完成,并且每隔30分钟左右调用一次process_data_events
,直到线程完成为止。
发布于 2021-02-04 05:53:24
这是关于如何避免由于心跳而中断连接的pika文档。
在一个大于0.11.2的版本中,虽然我们可以在pika.ConnectionParameters: heartbeat_interval=600中添加一个参数,但是如果服务器端的心跳值短到60,那就无能为力了。它只能在版本至少为0.11.2时才能工作。
发布于 2017-05-13 18:58:37
看,添加这个https://github.com/mosquito/aio-pika
这是一个异步包装器,如果你理解异步背后的概念,很容易使用:)
https://stackoverflow.com/questions/37321089
复制相似问题