RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用中间共享数据,RabbitMQ 是使用 Erlang 语言来编写的,并且 RabbitMQ 是基于 AMQP 协议的。
特点:
AMQP (Advanced Message Queuing Protocol) 即高级消息队列协议,是一个进程间传递异步消息的网络协议。
工作过程如下:首先发布者(Publisher)发布消息(Message),经由交换机 Exchange。交换机根据路由规则将收到的消息分发给与该交换机绑定的 Queue。最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
关于 AMQP 模型的几点说明:
消息生产者(Producer),向 Broker 发送消息的客户端。
消息消费者(Consumer),从 Broker 消费消息的客户端。
一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
Exchange 即交换器,是用来发送消息的 AMQP 实体。Exchange 拿到一个消息之后将它路由给一个或零个队列。Exchange 使用哪种路由算法是由交换机类型和绑定(Bindings)规则所决定的。
Producer 将消息发给 Exchange 时,一般会指定一个 RoutingKey (路由键),用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和 BindingKey (绑定键) 联合使用才能最终生效。
RabbitMQ 中通过 Binding (绑定) 将 Exchange 与 Queue(消息队列) 关联起来,在绑定时一般会指定一个 BindingKey,这样 RabbitMQ 就知道如何正确将消息路由到 Queue 中。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则。
生产者将消息发送给交换器,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。注意BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如 fanout 类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中。
Exchange 有以下 4 种类型,不同的类型对应着不同的路由策略:
Exchange 默认类型。路由规则是把消息路由到 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。direct 类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
以上图为例,如果发送消息的时候 RoutingKey="booking",那么消息会路由到 Queue1 和 Queue2。如果在发送消息的时候设置 RoutingKey="create" 或 "confirm",消息只会路由到Queue2。如果以其他的 RoutingKey 发送消息,则消息不会路由到这两个队列中。
路由规则是把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。
direct 类型的 Exchange 路由规则是完全匹配 BindingKey 和 RoutingKey ,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。
topic 类型的 Exchange 在匹配规则上进行了扩展,它与 direct 类型的 Exchange 相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
.
分隔的字符串,其中被 .
分隔开的每一段独立的字符串称为一个单词;.
分隔的字符串;*
和 #
,用于做模糊匹配,其中 *
用于匹配一个单词, #
用于匹配零个或多个单词。以上图为例,如果发送消息的时候 RoutingKey 为
headers 类型的 Exchange 不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的 Exchange 性能会很差,不推荐使用。
Queue 其实是 Message Queue 即消息队列,保存消息并将它们转发给消费者。Queue 是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其消费。
RabbitMQ 中消息只能存储在队列中,而 Kafka 将消息存储在 Topic 中,即该 Topic 对应的 Partition 中。RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
当多个消费者订阅同一个队列时,队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。
Queue 跟 Exchange 共享某些属性,但是队列也有一些另外的属性:
队列在声明(declare)后才能被使用。
如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为 406 的通道级异常就会被抛出。
持久化队列(Durable Queues)会被存储在磁盘上,当消息代理(Broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient Queues)。并不是所有的场景和案例都需要将队列持久化。
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。
AMQP 代理在什么时候删除消息才是正确的?AMQP 0-9-1 规范给我们两种建议:
当拒绝某条消息时,应用可以告诉消息代理销毁该条消息或者重新将该条消息放入队列。
当此队列只有一个消费者时,有可能存在拒绝消息并将消息重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况。
在多个消费者共享一个队列时,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。
AMQP 模型中的消息(Message)对象是带有属性(Attributes)的:
属性 | 说明 |
---|---|
Content type | 内容类型 |
Content encoding | 内容编码 |
Routing key | 路由键 |
Delivery mode (persistent or not) | 投递模式(持久化 或 非持久化) |
Message priority | 消息优先权 |
Message publishing timestamp | 消息发布的时间戳 |
Expiration period | 消息有效期 |
Publisher application id | 发布应用的 ID |
有些属性是被 AMQP 代理所使用的,但是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称作消息头(headers)。和 HTTP 协议的 X-Headers 很相似,消息属性需要在消息被发布的时候定义。
AMQP 的消息除属性外,也含有一个有效载荷 Payload(消息实际携带的数据),它被 AMQP 代理当作不透明的字节数组来对待。
消息代理不会检查或者修改 Payload,消息可以只包含属性而不携带有效载荷,它通常会使用类似 JSON 这种序列化的格式数据。
消息能够以持久化的方式发布,AMQP 代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。
简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能损失)。
启动服务器
rabbitmq-server start &
停止服务器
rabbitmqctl stop_app
http://localhost:15672/
# 用户名 guest
# 密码 guest
关闭应用
rabbitmqctl stop_app
启动应用
rabbitmqctl start_app
查看节点状态
rabbitmqctl status
添加用户
rabbitmqctl add_user username password
删除用户
rabbitmqctl delete_user username
列出所有用户
rabbitmqctl list_users
清除用户权限
rabbitmqctl clear_permissions -p vhostpath username
列出用户权限
rabbitmqctl list_user_permissions username
修改密码
rabbitmqctl change_password username newpassword
设置用户权限
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
创建虚拟主机
rabbitmqctl add_vhost vhostpath
删除虚拟主机
rabbitmqctl delete_vhost vhostpath
列出所有虚拟主机
rabbitmqctl list_vhosts
列出虚拟主机上所有权限
rabbitmqctl list_permissions -p vhostpath
查看所有队列信息
rabbitmqctl list_queues
清除队列里的消息
rabbitmqctl -p vhostpath purge_queue blue
移除所有数据
rabbitmqctl reset
# 要在 rabbitmqctl stop_app 之后使用
组成集群命令
rabbitmqctl join_cluster <clusternode> [--ram]
查看集群状态
rabbitmq cluster_status
修改集群节点的存储形式
rabbitmqctl change_cluser_node_type disc | ram
摘除节点(忘记节点)
rabbitmqctl forget_cluster_node [--offline]
修改节点名称
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2]
消息落库,对消息状态进行打标。
消息的延迟投递,做二次确认,回调检查。
消息的确认是指生产者投递消息后,如果 Broker 收到消息,则会给生产者一个应答,生产者进行接收应答,用来确定这条消息是否正常地发送到 Broker。
实现机制:
第一步:在 channel 上开启确认模式
channel.confirmSelect()
第二步:在 channel 上添加监听
channel.addConfirmListener()
监听成功和失败的返回结果,根据具体的结果对消息进行重新发送或记录日志等后续处理。
消息生产者通过制动一个 Exchange 和 routing key,把消息送达到某一个队列中去,然后消费者监听队列,进行消费处理操作。
在某些情况下,如果我们在发送消息的时候,当前的 Exchange 不存在或者指定的 routing key路由不到,此时我们需要监听这种不可达的消息,就要使用 Return Listener。
基础 API 有一个配置项 mandatory
RabbitMQ 提供了一种 QoS(服务质量保证) 功能,在非自动确认消息的前提下,如果一定数目的消息(通过基于 Consume 或者 Channel 设置 QoS 值)未被确认前,不进行消费新的消息。
涉及到的方法:
void BasicQoS(unit prefetchSize,ushort prefetchCount,bool global)
注意:
TTL(Time To Live)即生存时间。
利用 DLX,当消息在一个队列中变成死信(dead message)之后,其能被重新 publish 到另一个 Exchange,这个 Exchange 就是 DLX。
消息变成死信的几种情况:
注意:
DLX 也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列。
死信队列设置需要设置 Exchange 和 队列,然后绑定
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
然后我们进行正常声明 Exchange、队列和绑定,此时需要在队列上加上参数 arguments
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);