我在找人帮忙。我希望修复一个insert_order_queue()
函数,以便能够在消息实际上没有被传递到服务器的情况下将消息发送给RabbitMQ
。
这是我目前的代码:
def insert_order_queue(self, msg):
''' Insert message into the queue '''
if msg:
msg_props = pika.BasicProperties(delivery_mode=conf.rabbit_msg_props_delivery_mode,
content_type=conf.rabbit_msg_props_content_type)
logger.info('Message : %s' % msg)
try:
self.channel.basic_publish(body=json.dumps(msg),
exchange=conf.rabbit_exchange_name,
properties=msg_props,
routing_key=conf.rabbit_exchange_routing_key)
except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error:
logger.error('AMQP Connection failed. Trying again... %s' % error)
self._connect()
return
else:
logger.error('Something wrong')
这是我的_connect()
方法:
def _connect(self):
''' Connecting to the RabbitMQ, and declare queue '''
logger.info('Trying to connect to RabbitMQ')
while True:
try:
conn_broker = pika.BlockingConnection(
pika.ConnectionParameters(
host=conf.rabbit_server,
port=conf.rabbit_port,
virtual_host=conf.rabbit_vhost,
ssl=conf.rabbit_ssl, # do not set it to True if there is no ssl!
heartbeat_interval=conf.rabbit_heartbeat_interval,
credentials=pika.PlainCredentials(
conf.rabbit_user,
conf.rabbit_pass)))
logger.info('Successfully connected to Rabbit at %s:%s' % (conf.rabbit_server, conf.rabbit_port))
channel = conn_broker.channel()
# Don't dispatch a new message to a worker until it has processed and acknowledged the previous one
channel.basic_qos(prefetch_count=conf.rabbit_prefetch_count)
status = channel.queue_declare(queue=conf.rabbit_queue_name,
durable=conf.rabbit_queue_durable,
exclusive=conf.rabbit_queue_exclusive,
passive=conf.rabbit_queue_passive)
if status.method.message_count == 0:
logger.info("Queue empty")
else:
logger.info('Queue status: %s' % status)
channel.queue_bind(
queue=conf.rabbit_queue_name,
exchange=conf.rabbit_exchange_name,
routing_key=conf.rabbit_exchange_routing_key)
return channel
except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error:
time.sleep(3)
logger.error('Exception while connecting to Rabbit %s' % error)
else:
break
发布于 2013-07-25 18:04:10
有几种方式不能“传递”信息。
最明显的是“连接到兔子是关闭的”,在这种情况下,您只是重新连接和重发(您已经有了大部分的逻辑在重连接方面,只需要重新发送消息)。
然后,有几个变体“没有人在听这条信息”。这些可以通过basic_publish上的即时和强制性标志来处理。有关更多信息,请参见此:http://bunnyamqp.wordpress.com/2009/08/21/amqp-basic-publish-immediate-versus-mandatory/
最后,您可以添加一个确认回调。Pika允许您设置这个回调:
https://github.com/pika/pika/blob/master/pika/channel.py#L387
从该回调中,您可以决定是否再次发送消息。
https://stackoverflow.com/questions/17864616
复制相似问题