RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据或者将作业排队以便让分布式服务器进行处理。应用程序通过使用消息队列可以有效的进行解耦。
消息由有效载荷和标签两部分组成,其中有效载荷是你传输的数据;标签是对有效载荷的描述,rabbitmq使用标签来决定谁将获得消息的拷贝。Rabbitmq的生产者和消费者工作方式如下图所示:
生产者创建消息,并将消息发布到代理服务器(rabbitmq)中,rabbitmq会根据标签把消息发送给对方。消费者连接到代理服务器上,并订阅到相应的队列上。rabbitmq会将消息发送给监听/订阅的消费者,消费者它接收到的是有效载荷。消息路由过程并没有将消息标签一同传递,如果你想知道具体生产者的话,可以将生产者的信息封装到有效载荷中。rabbitmq主要有三部分组成:交换器、队列和绑定。生产者生产的信息需要发布到交换器上,消息最终到达队列并被消费者接收,消息最终达到队列中并等待消费,绑定决定了消息如何从路由器路由到特定的队列上。原理如下图所示:
消费者通过两种方式从特定的队列中接收消息:
需要注意的是如果至少有一个消费者订阅了队列,消息会立即发送给订阅;如果该队列没有消费者订阅那么消息会一直存在队列中知道有消费者订阅到队列上,队列上的消息才发送给消费者。当rabbitmq一个队列有用多个消费者,消费者是以轮询的方式发送给消费者。消费者通过basic.ack命令显式的向rabbitmq发送一个确认,此时rabbitmq才能安全的把消息从队列上删除。在接收到信息后你想明确拒绝或者不确认收到该消息的有两种方式:
我们通过使用queue.declare命令来创建队列,两个重要的参数:
当我们需要检测一些队列是否存在我们可以通过queue.declare的passive设置为true,如果队列存在则成功返回;如果队列不存在则会返回一个错误。值得考虑的问题是,在常见队列时我们的队列应该是由消费者还是生产者来创建呢?这个问题需要考虑是你的应用程序是否能承担起消息丢失,如果能承担起消息的丢失,可以只让消费者声明队列;如果不能承担起消息的丢失那么生产者和消费都要声明队列。
消息通过交换器和绑定(路由键)确保到达制定的队列,服务器会通过路由键将消息从交换器投递到队列中。为了支持投递多个队列,rabbitmq由四种类型的交换器:direct、fanout、topic和headers,headers性能差很多,我们这里主要介绍:direct、fanout和topic三种交换器类型。
如果路由键匹配的话,将消息按照路由键发送到对应的队列中。如下图所示:
,其中服务器必须实现direct类型的交换器,包含一个空白字符串名称的默认交换器,当生命一个队列时,它会自动绑定到默认交换器,并以队列名称作为路由键。如果默认的direct交换器无法满足应用时,需要使用exchange.declae来设置。
交换器会将收到的消息广播到绑定的队列上。这样可以允许你通过同一个消息做相应的不同工作。如下图所示:
交换器使来自不同源头的消息能够到达同一个队列。如下图所示:
如果服务器出现故障或者重启,那么队列和交换器都会消失了。这是因为在每个队列和交换器的durable属性默认为false,它决定了rabbitmq在重启或者崩溃之后是否重新创建队列和交换器。能从AMQP服务器中恢复的消息,称之为持久化。如果让我们自己的rabbitmq设置为持久化,做到以下三点: