专栏首页shysh95RabbitMQ基础使用

RabbitMQ基础使用

本篇文章针对RabbitMQ入门级使用,主要由有以下两个知识点:

1. RabbitMQ中的角色以及相关概念

2. RabbitMQ的基本API介绍

基本概念

上面一张图概括了消息的流转过程

生产者

生产消息的应用,生产者需要指定将消息发送到哪个exchange,并且指定routingkey(这是为了exchange可以将消息路由到相关的队列)。

消息由两部分组成:标签和消息体。标签指的就是exchange、routingkey、ttl等一系列用来描述该消息的集合,消息体就是消息的实际内容。

Broker

这里指的就是RabbitMQ服务实例

消费者

消费者需要订阅队列,消费消息的时候只会看到消息体的内容,标签在路由过程中会被丢弃。

队列

队列在RabbitMQ中是用来存储消息的(Kafka的消息存储级别在topic,所谓的队列其实只是topic存储文件中消息偏移量的标识)。

队列中的消息会被平摊给多个消费者,一条消息只会发给其中一个消费者。

交换器、路由键、绑定键

这在RabbitMQ中是一层抽象的东西,并不实际存在。交换器用来接收消息,并且将消息路由到一个或多个队列。

路由键:生产者在发送消息的时候会指定消息的路由规则,只有路由键符合绑定键时,消息才能正确的从交换器路由到队列。

绑定键指的是交换器和队列是如何绑定的,RabbitMQ才能知道消息怎样路由到队列。绑定键有时候是无效的,需要依赖于交换器的类型(fanout的交换器就会忽略绑定键)

交换器类型

fanout:该交换器会将消息路由到所有与该交换器绑定的队列

direct:交换器会将消息路由到BindingKey和RoutingKey完全一致的队列

topic:交换器会将消息根据一定的路由规则路由到相关队列,该队列约束如下

  • RoutingKey以.分割,每一个被.分割的都是一个单词
  • BindingKey也是.分割的字符串
  • BindingKey中存在*和#来进行模糊匹配,*匹配一个单词,#匹配任意多个单词

header:该交换器不依赖于路由键的匹配规则来路由消息,在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers(也是一个键值对的形式)对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列 。不建议使用该类型的交换器。

使用步骤

以下流程都是通俗的流程,实际的操作流程应因项目和公司规范来实际操作。

生产者和消费者和RabbitMQ进行连接即为一个Connection(TCP连接),生产者、消费者和RabbitMQ之间的数据传输都是通过Channel来进行,因此需要建立一个Channel。

Channel可以理解为基于一个TCP连接而建立的多个虚拟连接。

生产者

  1. 和RabbitMQ服务创建连接(Connection),开启一个信道(Channel)
  2. 声明交换器,同时声明交换器的属性(持久性、排他性、类型等信息)
  3. 声明队列,同时声明队列的属性(持久性、排他性、是否自动删除等信息)
  4. 使用BindingKey将队列和交换器进行绑定
  5. 发送消息到交换器,消息除了内容还可以指定消息的属性(是否持久化、消息内容的类型、编码等信息)
  6. 相应的交换器根据接收到的路由键查找相匹配的队列。如果找到,则将从 生产者发送过来的消息存入相应的队列中;如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  7. 关闭Channel、关闭Connection

消费者

  1. 和RabbitMQ服务创建连接(Connection),开启一个信道(Channel)
  2. 订阅队列,准备消费消息
  3. 等待RabbitMQ服务推送消息
  4. 消费者接收消息,处理完成之后进行确认(ack)
  5. RabbitMQ服务将已确认的消息从队列中移除
  6. 关闭Channel和Connection或者持续消费

引入Channel

假设一个应用程序有很多个线程需要从RabbitMQ消费或者生产消息,如果一个线程一个TCP连接,将会产生很大的性能问题。通过信道可以实现TCP连接复用,减少性能开销。

每个线程把持一个信道,所以信道复用了Connection的TCP连接。同时RabbitMQ可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。但是当信道本身的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个Connection,将这些信道均摊到这些Connection中,至于这些相关的调优策略需要根据业务自身的实际情况进行调节。

AMQP协议

RabbitMQ就是AMQP协议的Erlang的实现。AMQP协议具体可以分为三层:

  1. Module Layer:位于协议的最高层,定义了根客户端调用相关的命令,客户端可以利用这些命令实现自己的业务逻辑
  2. Session Layer:位于中间层,将客户端命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
  3. Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。

生产者

当客户端与Broker建立连接的时候,会调用factory.newConnection方法,这个方法会进一步封装成Protocol Header 0-9-1的报文头发送给Broker,以此通知Broker本次交互采用的是AMQPO-9-1协议,紧接着Broker返回Connection.Start来建立连接,在连接的过程中涉及Connection.Start/.Start-OK、Connection.Tune/.Tune-Ok、Connection.Open/.Open-Ok这6个命令的交互。

当客户端调用connection.createChannel方法准备开启信道的时候,其包装Channel.Open命令发送给Broker,等待Channel.Open-Ok命令。

当客户端发送消息的时候,需要调用channel.basicPublish方法,对应的AMQP命令为Basic.Publish,注意这个命令和前面涉及的命令略有不同,这个命令还包含了Content Header和ContentBody。ContentHeader里面包含的是消息体的属性,例如投递模式、优先级等,而ContentBody包含消息体本身。

当客户端发送完消息需要关闭资源时,涉及Channel.Close/.Close-Ok与Connection.Close/.Close-Ok的命令交互。

消费者

消费者建立连接和开启信道同生产者一样,不再赘述。

如果在消费之前调用了channel.basicQos(int prefetchCount)的方法来设置消费者客户端最大能"保持"的未确认的消息数,那么协议流转会涉及Basic.Qos/.Qos-Ok这两个AMQP命令。

在真正消费之前,消费者客户端需要向Broker发送 Basic.Consume命令(即调用channel.basicConsume方法)将Channel置为接收模式,之后Broker回执Basic.Consume-Ok以告诉消费者准备好消费消息。

紧接着Broker向消费者客户端推送(Push)消息,即Basic.Deliver命令,有意思的是这个和Basic.Publish命令一样会携带ContentHeader和ContentBody 消费者接收到消息并正确消费之后,向Broker发送确认,即Basic.Ack命令。

在消费者停止消费的时候,主动关闭连接,这点和生产者一样,涉及Channel.Close/.Close-Ok和Connection.Close/.Close-Ok。

基本API

Connection

Connection可以用来创建多个Channel实例,但是Channel实例不能在线程间共享,应用程序应该为每一个线程开辟一个Channel。 某些情况下Channel的操作可以并发运行,但是在其他情况下会导致在网络上出现错误的通信帧交错,同时也会影响发送方确认机制的运行,所以多线程间Channel实例是非线程安全的。

交换器exchange

    Exchange.DeclareOk exchangeDeclare(String exchange,                                              String type,                                              boolean durable,                                              boolean autoDelete,                                              boolean internal,                                              Map<String, Object> arguments) throws IOException;
  1. exchange:交换器名称
  2. type:交换器类型
  3. durable:交换器是否可持久化。开启持久化可将交换器落地磁盘。
  4. autoDelete:交换器是否自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。
  5. internal:设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  6. arguments:其他一些结构化参数,比如alternate-exchange

exchangeDeclareNoWait在声明交换器的时候不需要等待服务器返回,假设服务器还没创建交换器成功,接着便使用了交换器,将会抛出异常。

exchangeDeclarePassive要用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常:404 channel exception,同时Channel也会被关闭。

Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
  1. exchange:交换器名称
  2. ifUnused:设置为true,只有当交换器没被使用的时候才会被删除,如果为false直接删除。

队列Queue

    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,                                 Map<String, Object> arguments) throws IOException;
  1. queue:队列名称
  2. durable:队列是否持久化。开启持久化队列落地磁盘,在服务器重启的时候可以保证不丢失相关信息。
  3. exclusive:队列是否排他。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接(Connection)可见的, 同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列; "首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的, 这个与普通队列不同:即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
  4. autoDelete:队列是否自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
  5. arguments:设置队列的其他一些参数,如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead­letter-routing-key、x-max-priority等。

生产者和消费者都能够使用queueDeclare来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道直为"传输"模式,之后才能声明队列。

queueDeclareNoWait创建队列不需要服务端的任何返回,紧接着使用声明的队列时有可能会发生异常情况。

queueDeclarePassive用来检测相应的队列是否存在。如果存在则正常返回,如果不存在则抛出异常:404 channel exception,同时Channel也会被关闭。

Queue.DeleteOk queueDelete(String queue, boolean ifUnused,boolean ifEmpty) throws IOException;
  1. queue:队列名称
  2. ifUnused:设置为true,只有当队列没被使用的时候才会被删除,如果为false直接删除。
  3. ifEmpty:设置为true,只有当队列为空的时候才删除
Queue.PurgeOk queuePurge(String queue) throws IOException;

不删除队列,只清空队列内容。

队列绑定queueBind

Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
  1. queue:队列名称
  2. exchange:交换器名称
  3. routingKey:绑定键
  4. arguments:绑定的一些参数

交换器绑定exchangeBind

Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
  1. destination:源交换器
  2. source:目的交换器
  3. routingKey:绑定键
  4. arguments:绑定的一些参数

生产者将消息发送到destination交换器,RabbitMQ服务根据路由键将消息转发到source路由器,进而存储在destination绑定的队列queue中。

发送消息basicPublish

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)            throws IOException;
  1. exchange:交换器名称
  2. routingKey:路由键
  3. mandatory:当该参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当该参数设置为false时,则消息直接被丢弃 。
  4. immediate:该参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
  5. props:消息的基本属性集,其包含14个属性成员,分别有contentType、contentEncoding、headers(Map)、deliveryMode、priority、correlationId、replyTo、 expiration、messageId、timestamp、type、userId、appId、clusterId
  6. body:消息体,真正发送的消息

RabbitMQ 3.0以后取消对immediate参数的支持,禁止设置为true。

生产者可以通过调用channel.addReturnListener来添加ReturnListener监听器的实现获取到没有被正确路由到合适队列的消息。

消费消息

消费消息分为推模式和拉模式

推模式

String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
  1. queue:队列名称
  2. autoAck:是否自动确认,建议关闭自动确认
  3. consumerTag:消费者标签,用来区分多个消费者
  4. noLocal:设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者
  5. exclusive:设置是否排他
  6. arguments:设置消费者的其他参数
  7. callback:设置消费者的回调函数。用来处理RabbitMQ推送过来的消息,比如DefaultConsumer,使用时需要客户端重写(override)其中的方法。
public interface Consumer {    void handleConsumeOk(String consumerTag);    void handleCancelOk(String consumerTag);    void handleCancel(String consumerTag) throws IOException;    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);    void handleRecoverOk(String consumerTag);    void handleDelivery(String consumerTag,                        Envelope envelope,                        AMQP.BasicProperties properties,                        byte[] body)        throws IOException;}
  1. handleShutdownSignal:当Channel或者Connection关闭的时候会调用。
  2. handleConsumeOk:会在其他方法之前调用,返回消费者标签
  3. handleCancelOk:消费端可以在显式地或者隐式地取消订阅的时候调用
  4. handleCancel:消费端可以在显式地或者隐式地取消订阅的时候调用
  5. handleDelivery:用来处理队列推送的消息

消费者客户端的这些callback会被分配到与Channel不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,比如 channel.queueDeclare、channel.basicCancel等。

拉模式

从消息队列中主动拉取一条message

GetResponse basicGet(String queue, boolean autoAck) throws IOException;
  1. queue:队列名称
  2. autoAck:是否自动确认

消费确认和拒绝

RabbitMQ服务提供了消息确认机制,将autoAck参数设置为false,消费者可以有足够的时间处理消息。

void basicAck(long deliveryTag, boolean multiple) throws IOException;
  1. deliveryTag:消息编号,64位的长整型值,最大值是9223372036854775807。
  2. multiple:true:确认该消息编号之前的所有消息,false:只确认该消息编号的消息。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
  1. deliveryTag:消息编号,64位的长整型值,最大值是9223372036854775807。
  2. requeue:是否重入队列。true:重新入队,false:直接删除消息
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

批量拒绝消息

  1. deliveryTag:消息编号,64位的长整型值,最大值是9223372036854775807。
  2. requeue:是否重入队列。true:重新入队,false:直接删除消息
  3. multiple:true:拒绝该消息编号之前的所有消息,false:只拒绝该消息编号的消息。
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

该方法用来请求RabbitMQ重新发送还未被确认的消息。如果requeue参数设置为true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。 如果requeue参数设置为false,那么同一条消息会被分配给与之前相同的消费者。默认情况下,如果不设置requeue这个参数,相当于channel.basicRecover(true),即requeue默认为true。

关闭连接

Connection和Channel所具备的生命周期如下所述:

  • open:开启状态,代表该对象可用
  • closing:正在关闭状态,当前对象被显式地通知调用关闭方法,这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成。
  • closed:关闭状态。当前对象己经接收到所有的内部对象己完成关闭动作的通知,并且其也关闭了自身。

可以为Connection和Channel添加一个ShutdownListener,当Connection和Channel关闭以后,将会通知ShutdownListener监听器。

Connection和Channel的getCloseReason方法可以获取关闭的原因。

@FunctionalInterfacepublic interface ShutdownListener extends EventListener {    void shutdownCompleted(ShutdownSignalException cause);}

cause的isHardError的方法如果返回true代表是Connection的错误,如果是false是Channel的错误。

cause的getReference可以获取Connection或者Channel。

本文分享自微信公众号 - shysh95(shysh95),作者:shysh95

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-04-02

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RabbitMQ进阶使用

    在入门使用曾提到过,生产者发送消息可以使用mandatory参数,该参数的作用主要是在交换器根据路由键无法匹配队列的时候讲消息返回给生产者,但是需要生产者通过R...

    shysh95
  • RabbitMQ Federation

    在文章开始之前,我们先介绍一下联邦机制的基本概念。联邦机制的实现,依赖于RabbitMQ的Federation插件,该插件的主要目标是为了RabbitMQ可以在...

    shysh95
  • RabbitMQ镜像队列

    镜像队列主要有两种类型:master和slave。master和slave节点位于同一个集群中。master只要一个节点,slave可以有多个节点。

    shysh95
  • RabbitMQ快速入门

    最近一段项目实践中大量使用了基于RabbitMQ的消息中间件,也积累的一些经验和思考,特此成文,望大家不吝赐教。 本文包括RabbitMQ基本概念、进阶概念、...

    用户1216676
  • springboot + 消息队列

    第一种:用户注册信息写入数据库后在按照顺序先后发送注册邮件和短信,走完这三步后用户才完成注册

    桑鱼
  • RabbitMQ消息通信

    ---- 概述 RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据或者将作业排队以便让分布式服务器进行处理。应用程序...

    BrianLv
  • [045][译]cfq-iosched.txt

    按照[043][译]blkio-controller.txt,我已经学会了如何通过cgroup v1来调整不同进程的IO权重,这个IO权重是在CFQ调度算法中实...

    王小二
  • CKafka系列学习文章 - 什么是消息队列 ?(一)

    | 导语 在大家的工作当中,是否碰到大量的插入、更新请求同时到达数据库,这会导致行或表被锁住,最后会因为请求堆积过多而触发“连接数过多的异常”(Too Man...

    发哥说消息队列
  • Java中的 Threadpoolexecutor类

    在之前的文章Java中executors提供的的4种线程池中,学习了一下Executors类中提供的四种线程池.

    呼延十
  • 技术选型 | 常用消息中间件17个维度全方位对比

    本文将从,Kafka、RabbitMQ、ZeroMQ、RocketMQ、ActiveMQ 17 个方面综合对比作为消息队列使用时的差异。

    用户1516716

扫码关注云+社区

领取腾讯云代金券