“ 本篇文章针对RabbitMQ入门级使用,主要由有以下两个知识点:
1. RabbitMQ中的角色以及相关概念
2. RabbitMQ的基本API介绍
”
上面一张图概括了消息的流转过程
生产消息的应用,生产者需要指定将消息发送到哪个exchange,并且指定routingkey(这是为了exchange可以将消息路由到相关的队列)。
消息由两部分组成:标签和消息体。标签指的就是exchange、routingkey、ttl等一系列用来描述该消息的集合,消息体就是消息的实际内容。
这里指的就是RabbitMQ服务实例
消费者需要订阅队列,消费消息的时候只会看到消息体的内容,标签在路由过程中会被丢弃。
队列在RabbitMQ中是用来存储消息的(Kafka的消息存储级别在topic,所谓的队列其实只是topic存储文件中消息偏移量的标识)。
队列中的消息会被平摊给多个消费者,一条消息只会发给其中一个消费者。
这在RabbitMQ中是一层抽象的东西,并不实际存在。交换器用来接收消息,并且将消息路由到一个或多个队列。
路由键:生产者在发送消息的时候会指定消息的路由规则,只有路由键符合绑定键时,消息才能正确的从交换器路由到队列。
绑定键指的是交换器和队列是如何绑定的,RabbitMQ才能知道消息怎样路由到队列。绑定键有时候是无效的,需要依赖于交换器的类型(fanout的交换器就会忽略绑定键)
fanout:该交换器会将消息路由到所有与该交换器绑定的队列
direct:交换器会将消息路由到BindingKey和RoutingKey完全一致的队列
topic:交换器会将消息根据一定的路由规则路由到相关队列,该队列约束如下
header:该交换器不依赖于路由键的匹配规则来路由消息,在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers(也是一个键值对的形式)对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列 。不建议使用该类型的交换器。
以下流程都是通俗的流程,实际的操作流程应因项目和公司规范来实际操作。
生产者和消费者和RabbitMQ进行连接即为一个Connection(TCP连接),生产者、消费者和RabbitMQ之间的数据传输都是通过Channel来进行,因此需要建立一个Channel。
Channel可以理解为基于一个TCP连接而建立的多个虚拟连接。
假设一个应用程序有很多个线程需要从RabbitMQ消费或者生产消息,如果一个线程一个TCP连接,将会产生很大的性能问题。通过信道可以实现TCP连接复用,减少性能开销。
每个线程把持一个信道,所以信道复用了Connection的TCP连接。同时RabbitMQ可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。但是当信道本身的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个Connection,将这些信道均摊到这些Connection中,至于这些相关的调优策略需要根据业务自身的实际情况进行调节。
RabbitMQ就是AMQP协议的Erlang的实现。AMQP协议具体可以分为三层:
当客户端与Broker建立连接的时候,会调用factory.newConnection方法,这个方法会进一步封装成Protocol Header 0-9-1的报文头发送给Broker,以此通知Broker本次交互采用的是AMQPO-9-1协议,紧接着Broker返回Connection.Start来建立连接,在连接的过程中涉及Connection.Start/.Start-OK、Connection.Tune/.Tune-Ok、Connection.Open/.Open-Ok这6个命令的交互。
当客户端调用connection.createChannel方法准备开启信道的时候,其包装Channel.Open命令发送给Broker,等待Channel.Open-Ok命令。
当客户端发送消息的时候,需要调用channel.basicPublish方法,对应的AMQP命令为Basic.Publish,注意这个命令和前面涉及的命令略有不同,这个命令还包含了Content Header和ContentBody。ContentHeader里面包含的是消息体的属性,例如投递模式、优先级等,而ContentBody包含消息体本身。
当客户端发送完消息需要关闭资源时,涉及Channel.Close/.Close-Ok与Connection.Close/.Close-Ok的命令交互。
消费者建立连接和开启信道同生产者一样,不再赘述。
如果在消费之前调用了channel.basicQos(int prefetchCount)的方法来设置消费者客户端最大能"保持"的未确认的消息数,那么协议流转会涉及Basic.Qos/.Qos-Ok这两个AMQP命令。
在真正消费之前,消费者客户端需要向Broker发送 Basic.Consume命令(即调用channel.basicConsume方法)将Channel置为接收模式,之后Broker回执Basic.Consume-Ok以告诉消费者准备好消费消息。
紧接着Broker向消费者客户端推送(Push)消息,即Basic.Deliver命令,有意思的是这个和Basic.Publish命令一样会携带ContentHeader和ContentBody 消费者接收到消息并正确消费之后,向Broker发送确认,即Basic.Ack命令。
在消费者停止消费的时候,主动关闭连接,这点和生产者一样,涉及Channel.Close/.Close-Ok和Connection.Close/.Close-Ok。
Connection可以用来创建多个Channel实例,但是Channel实例不能在线程间共享,应用程序应该为每一个线程开辟一个Channel。 某些情况下Channel的操作可以并发运行,但是在其他情况下会导致在网络上出现错误的通信帧交错,同时也会影响发送方确认机制的运行,所以多线程间Channel实例是非线程安全的。
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
exchangeDeclareNoWait在声明交换器的时候不需要等待服务器返回,假设服务器还没创建交换器成功,接着便使用了交换器,将会抛出异常。
exchangeDeclarePassive要用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常:404 channel exception,同时Channel也会被关闭。
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
生产者和消费者都能够使用queueDeclare来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道直为"传输"模式,之后才能声明队列。
queueDeclareNoWait创建队列不需要服务端的任何返回,紧接着使用声明的队列时有可能会发生异常情况。
queueDeclarePassive用来检测相应的队列是否存在。如果存在则正常返回,如果不存在则抛出异常:404 channel exception,同时Channel也会被关闭。
Queue.DeleteOk queueDelete(String queue, boolean ifUnused,boolean ifEmpty) throws IOException;
Queue.PurgeOk queuePurge(String queue) throws IOException;
不删除队列,只清空队列内容。
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
生产者将消息发送到destination交换器,RabbitMQ服务根据路由键将消息转发到source路由器,进而存储在destination绑定的队列queue中。
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
RabbitMQ 3.0以后取消对immediate参数的支持,禁止设置为true。
生产者可以通过调用channel.addReturnListener来添加ReturnListener监听器的实现获取到没有被正确路由到合适队列的消息。
消费消息分为推模式和拉模式
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
public interface Consumer { void handleConsumeOk(String consumerTag); void handleCancelOk(String consumerTag); void handleCancel(String consumerTag) throws IOException; void handleShutdownSignal(String consumerTag, ShutdownSignalException sig); void handleRecoverOk(String consumerTag); void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;}
消费者客户端的这些callback会被分配到与Channel不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,比如 channel.queueDeclare、channel.basicCancel等。
从消息队列中主动拉取一条message
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
RabbitMQ服务提供了消息确认机制,将autoAck参数设置为false,消费者可以有足够的时间处理消息。
void basicAck(long deliveryTag, boolean multiple) throws IOException;
void basicReject(long deliveryTag, boolean requeue) throws IOException;
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
批量拒绝消息
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
该方法用来请求RabbitMQ重新发送还未被确认的消息。如果requeue参数设置为true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。 如果requeue参数设置为false,那么同一条消息会被分配给与之前相同的消费者。默认情况下,如果不设置requeue这个参数,相当于channel.basicRecover(true),即requeue默认为true。
Connection和Channel所具备的生命周期如下所述:
可以为Connection和Channel添加一个ShutdownListener,当Connection和Channel关闭以后,将会通知ShutdownListener监听器。
Connection和Channel的getCloseReason方法可以获取关闭的原因。
@FunctionalInterfacepublic interface ShutdownListener extends EventListener { void shutdownCompleted(ShutdownSignalException cause);}
cause的isHardError的方法如果返回true代表是Connection的错误,如果是false是Channel的错误。
cause的getReference可以获取Connection或者Channel。