前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ基础使用

RabbitMQ基础使用

作者头像
shysh95
发布2019-07-23 09:53:39
1.1K0
发布2019-07-23 09:53:39
举报
文章被收录于专栏:shysh95shysh95

本篇文章针对RabbitMQ入门级使用,主要由有以下两个知识点:

1. RabbitMQ中的角色以及相关概念

2. RabbitMQ的基本API介绍

基本概念

上面一张图概括了消息的流转过程

生产者

生产消息的应用,生产者需要指定将消息发送到哪个exchange,并且指定routingkey(这是为了exchange可以将消息路由到相关的队列)。

消息由两部分组成:标签和消息体。标签指的就是exchange、routingkey、ttl等一系列用来描述该消息的集合,消息体就是消息的实际内容。

Broker

这里指的就是RabbitMQ服务实例

消费者

消费者需要订阅队列,消费消息的时候只会看到消息体的内容,标签在路由过程中会被丢弃。

队列

队列在RabbitMQ中是用来存储消息的(Kafka的消息存储级别在topic,所谓的队列其实只是topic存储文件中消息偏移量的标识)。

队列中的消息会被平摊给多个消费者,一条消息只会发给其中一个消费者。

交换器、路由键、绑定键

这在RabbitMQ中是一层抽象的东西,并不实际存在。交换器用来接收消息,并且将消息路由到一个或多个队列。

路由键:生产者在发送消息的时候会指定消息的路由规则,只有路由键符合绑定键时,消息才能正确的从交换器路由到队列。

绑定键指的是交换器和队列是如何绑定的,RabbitMQ才能知道消息怎样路由到队列。绑定键有时候是无效的,需要依赖于交换器的类型(fanout的交换器就会忽略绑定键)

交换器类型

fanout:该交换器会将消息路由到所有与该交换器绑定的队列

direct:交换器会将消息路由到BindingKey和RoutingKey完全一致的队列

topic:交换器会将消息根据一定的路由规则路由到相关队列,该队列约束如下

  • RoutingKey以.分割,每一个被.分割的都是一个单词
  • BindingKey也是.分割的字符串
  • BindingKey中存在*和#来进行模糊匹配,*匹配一个单词,#匹配任意多个单词

header:该交换器不依赖于路由键的匹配规则来路由消息,在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers(也是一个键值对的形式)对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列 。不建议使用该类型的交换器。

使用步骤

以下流程都是通俗的流程,实际的操作流程应因项目和公司规范来实际操作。

生产者和消费者和RabbitMQ进行连接即为一个Connection(TCP连接),生产者、消费者和RabbitMQ之间的数据传输都是通过Channel来进行,因此需要建立一个Channel。

Channel可以理解为基于一个TCP连接而建立的多个虚拟连接。

生产者
  1. 和RabbitMQ服务创建连接(Connection),开启一个信道(Channel)
  2. 声明交换器,同时声明交换器的属性(持久性、排他性、类型等信息)
  3. 声明队列,同时声明队列的属性(持久性、排他性、是否自动删除等信息)
  4. 使用BindingKey将队列和交换器进行绑定
  5. 发送消息到交换器,消息除了内容还可以指定消息的属性(是否持久化、消息内容的类型、编码等信息)
  6. 相应的交换器根据接收到的路由键查找相匹配的队列。如果找到,则将从 生产者发送过来的消息存入相应的队列中;如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  7. 关闭Channel、关闭Connection
消费者
  1. 和RabbitMQ服务创建连接(Connection),开启一个信道(Channel)
  2. 订阅队列,准备消费消息
  3. 等待RabbitMQ服务推送消息
  4. 消费者接收消息,处理完成之后进行确认(ack)
  5. RabbitMQ服务将已确认的消息从队列中移除
  6. 关闭Channel和Connection或者持续消费
引入Channel

假设一个应用程序有很多个线程需要从RabbitMQ消费或者生产消息,如果一个线程一个TCP连接,将会产生很大的性能问题。通过信道可以实现TCP连接复用,减少性能开销。

每个线程把持一个信道,所以信道复用了Connection的TCP连接。同时RabbitMQ可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。但是当信道本身的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个Connection,将这些信道均摊到这些Connection中,至于这些相关的调优策略需要根据业务自身的实际情况进行调节。

AMQP协议

RabbitMQ就是AMQP协议的Erlang的实现。AMQP协议具体可以分为三层:

  1. Module Layer:位于协议的最高层,定义了根客户端调用相关的命令,客户端可以利用这些命令实现自己的业务逻辑
  2. Session Layer:位于中间层,将客户端命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
  3. Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。

生产者

当客户端与Broker建立连接的时候,会调用factory.newConnection方法,这个方法会进一步封装成Protocol Header 0-9-1的报文头发送给Broker,以此通知Broker本次交互采用的是AMQPO-9-1协议,紧接着Broker返回Connection.Start来建立连接,在连接的过程中涉及Connection.Start/.Start-OK、Connection.Tune/.Tune-Ok、Connection.Open/.Open-Ok这6个命令的交互。

当客户端调用connection.createChannel方法准备开启信道的时候,其包装Channel.Open命令发送给Broker,等待Channel.Open-Ok命令。

当客户端发送消息的时候,需要调用channel.basicPublish方法,对应的AMQP命令为Basic.Publish,注意这个命令和前面涉及的命令略有不同,这个命令还包含了Content Header和ContentBody。ContentHeader里面包含的是消息体的属性,例如投递模式、优先级等,而ContentBody包含消息体本身。

当客户端发送完消息需要关闭资源时,涉及Channel.Close/.Close-Ok与Connection.Close/.Close-Ok的命令交互。

消费者

消费者建立连接和开启信道同生产者一样,不再赘述。

如果在消费之前调用了channel.basicQos(int prefetchCount)的方法来设置消费者客户端最大能"保持"的未确认的消息数,那么协议流转会涉及Basic.Qos/.Qos-Ok这两个AMQP命令。

在真正消费之前,消费者客户端需要向Broker发送 Basic.Consume命令(即调用channel.basicConsume方法)将Channel置为接收模式,之后Broker回执Basic.Consume-Ok以告诉消费者准备好消费消息。

紧接着Broker向消费者客户端推送(Push)消息,即Basic.Deliver命令,有意思的是这个和Basic.Publish命令一样会携带ContentHeader和ContentBody 消费者接收到消息并正确消费之后,向Broker发送确认,即Basic.Ack命令。

在消费者停止消费的时候,主动关闭连接,这点和生产者一样,涉及Channel.Close/.Close-Ok和Connection.Close/.Close-Ok。

基本API

Connection

Connection可以用来创建多个Channel实例,但是Channel实例不能在线程间共享,应用程序应该为每一个线程开辟一个Channel。 某些情况下Channel的操作可以并发运行,但是在其他情况下会导致在网络上出现错误的通信帧交错,同时也会影响发送方确认机制的运行,所以多线程间Channel实例是非线程安全的。

交换器exchange

代码语言:javascript
复制
    Exchange.DeclareOk exchangeDeclare(String exchange,                                              String type,                                              boolean durable,                                              boolean autoDelete,                                              boolean internal,                                              Map<String, Object> arguments) throws IOException;
  1. exchange:交换器名称
  2. type:交换器类型
  3. durable:交换器是否可持久化。开启持久化可将交换器落地磁盘。
  4. autoDelete:交换器是否自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。
  5. internal:设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  6. arguments:其他一些结构化参数,比如alternate-exchange

exchangeDeclareNoWait在声明交换器的时候不需要等待服务器返回,假设服务器还没创建交换器成功,接着便使用了交换器,将会抛出异常。

exchangeDeclarePassive要用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常:404 channel exception,同时Channel也会被关闭。

代码语言:javascript
复制
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
  1. exchange:交换器名称
  2. ifUnused:设置为true,只有当交换器没被使用的时候才会被删除,如果为false直接删除。

队列Queue

代码语言:javascript
复制
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,                                 Map<String, Object> arguments) throws IOException;
  1. queue:队列名称
  2. durable:队列是否持久化。开启持久化队列落地磁盘,在服务器重启的时候可以保证不丢失相关信息。
  3. exclusive:队列是否排他。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接(Connection)可见的, 同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列; "首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的, 这个与普通队列不同:即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
  4. autoDelete:队列是否自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
  5. arguments:设置队列的其他一些参数,如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead­letter-routing-key、x-max-priority等。

生产者和消费者都能够使用queueDeclare来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道直为"传输"模式,之后才能声明队列。

queueDeclareNoWait创建队列不需要服务端的任何返回,紧接着使用声明的队列时有可能会发生异常情况。

queueDeclarePassive用来检测相应的队列是否存在。如果存在则正常返回,如果不存在则抛出异常:404 channel exception,同时Channel也会被关闭。

代码语言:javascript
复制
Queue.DeleteOk queueDelete(String queue, boolean ifUnused,boolean ifEmpty) throws IOException;
  1. queue:队列名称
  2. ifUnused:设置为true,只有当队列没被使用的时候才会被删除,如果为false直接删除。
  3. ifEmpty:设置为true,只有当队列为空的时候才删除
代码语言:javascript
复制
Queue.PurgeOk queuePurge(String queue) throws IOException;

不删除队列,只清空队列内容。

队列绑定queueBind

代码语言:javascript
复制
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
  1. queue:队列名称
  2. exchange:交换器名称
  3. routingKey:绑定键
  4. arguments:绑定的一些参数

交换器绑定exchangeBind

代码语言:javascript
复制
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
  1. destination:源交换器
  2. source:目的交换器
  3. routingKey:绑定键
  4. arguments:绑定的一些参数

生产者将消息发送到destination交换器,RabbitMQ服务根据路由键将消息转发到source路由器,进而存储在destination绑定的队列queue中。

发送消息basicPublish

代码语言:javascript
复制
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)            throws IOException;
  1. exchange:交换器名称
  2. routingKey:路由键
  3. mandatory:当该参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当该参数设置为false时,则消息直接被丢弃 。
  4. immediate:该参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
  5. props:消息的基本属性集,其包含14个属性成员,分别有contentType、contentEncoding、headers(Map)、deliveryMode、priority、correlationId、replyTo、 expiration、messageId、timestamp、type、userId、appId、clusterId
  6. body:消息体,真正发送的消息

RabbitMQ 3.0以后取消对immediate参数的支持,禁止设置为true。

生产者可以通过调用channel.addReturnListener来添加ReturnListener监听器的实现获取到没有被正确路由到合适队列的消息。

消费消息

消费消息分为推模式和拉模式

推模式
代码语言:javascript
复制
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
  1. queue:队列名称
  2. autoAck:是否自动确认,建议关闭自动确认
  3. consumerTag:消费者标签,用来区分多个消费者
  4. noLocal:设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者
  5. exclusive:设置是否排他
  6. arguments:设置消费者的其他参数
  7. callback:设置消费者的回调函数。用来处理RabbitMQ推送过来的消息,比如DefaultConsumer,使用时需要客户端重写(override)其中的方法。
代码语言:javascript
复制
public interface Consumer {    void handleConsumeOk(String consumerTag);    void handleCancelOk(String consumerTag);    void handleCancel(String consumerTag) throws IOException;    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);    void handleRecoverOk(String consumerTag);    void handleDelivery(String consumerTag,                        Envelope envelope,                        AMQP.BasicProperties properties,                        byte[] body)        throws IOException;}
  1. handleShutdownSignal:当Channel或者Connection关闭的时候会调用。
  2. handleConsumeOk:会在其他方法之前调用,返回消费者标签
  3. handleCancelOk:消费端可以在显式地或者隐式地取消订阅的时候调用
  4. handleCancel:消费端可以在显式地或者隐式地取消订阅的时候调用
  5. handleDelivery:用来处理队列推送的消息

消费者客户端的这些callback会被分配到与Channel不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,比如 channel.queueDeclare、channel.basicCancel等。

拉模式

从消息队列中主动拉取一条message

代码语言:javascript
复制
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
  1. queue:队列名称
  2. autoAck:是否自动确认

消费确认和拒绝

RabbitMQ服务提供了消息确认机制,将autoAck参数设置为false,消费者可以有足够的时间处理消息。

代码语言:javascript
复制
void basicAck(long deliveryTag, boolean multiple) throws IOException;
  1. deliveryTag:消息编号,64位的长整型值,最大值是9223372036854775807。
  2. multiple:true:确认该消息编号之前的所有消息,false:只确认该消息编号的消息。
代码语言:javascript
复制
void basicReject(long deliveryTag, boolean requeue) throws IOException;
  1. deliveryTag:消息编号,64位的长整型值,最大值是9223372036854775807。
  2. requeue:是否重入队列。true:重新入队,false:直接删除消息
代码语言:javascript
复制
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

批量拒绝消息

  1. deliveryTag:消息编号,64位的长整型值,最大值是9223372036854775807。
  2. requeue:是否重入队列。true:重新入队,false:直接删除消息
  3. multiple:true:拒绝该消息编号之前的所有消息,false:只拒绝该消息编号的消息。
代码语言:javascript
复制
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

该方法用来请求RabbitMQ重新发送还未被确认的消息。如果requeue参数设置为true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。 如果requeue参数设置为false,那么同一条消息会被分配给与之前相同的消费者。默认情况下,如果不设置requeue这个参数,相当于channel.basicRecover(true),即requeue默认为true。

关闭连接

Connection和Channel所具备的生命周期如下所述:

  • open:开启状态,代表该对象可用
  • closing:正在关闭状态,当前对象被显式地通知调用关闭方法,这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成。
  • closed:关闭状态。当前对象己经接收到所有的内部对象己完成关闭动作的通知,并且其也关闭了自身。

可以为Connection和Channel添加一个ShutdownListener,当Connection和Channel关闭以后,将会通知ShutdownListener监听器。

Connection和Channel的getCloseReason方法可以获取关闭的原因。

代码语言:javascript
复制
@FunctionalInterfacepublic interface ShutdownListener extends EventListener {    void shutdownCompleted(ShutdownSignalException cause);}

cause的isHardError的方法如果返回true代表是Connection的错误,如果是false是Channel的错误。

cause的getReference可以获取Connection或者Channel。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-04-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员修炼笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基本概念
    • 生产者
      • Broker
        • 消费者
          • 队列
            • 交换器、路由键、绑定键
              • 交换器类型
            • 使用步骤
              • 生产者
              • 消费者
              • 引入Channel
          • AMQP协议
            • 生产者
              • 消费者
              • 基本API
                • Connection
                  • 交换器exchange
                    • 队列Queue
                      • 队列绑定queueBind
                        • 交换器绑定exchangeBind
                          • 发送消息basicPublish
                            • 消费消息
                              • 推模式
                              • 拉模式
                            • 消费确认和拒绝
                              • 关闭连接
                              相关产品与服务
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档