RabbitMQ快速入门

最近一段项目实践中大量使用了基于RabbitMQ的消息中间件,也积累的一些经验和思考,特此成文,望大家不吝赐教。 本文包括RabbitMQ基本概念、进阶概念、实践与思考等三部分,着重强调相关概念基于RabbitMQ进行扩展开发的思路,并简要展示RabbitMQ客户端的编码,接下来通过一个思维导图来展示整体思路,红星表示重点部分。

1.基本概念

官方文档: http://www.rabbitmq.com/#getstarted

1.1.核心实体

进入详细介绍之前,先来看一张简化版的消息流转的模型图。

  • a.生产者Producer发送消息,消息分为消息标识和消息体(payLoad有效载荷)两部分,消息标识中包含Exchange交换器的名字和RoutingKey路由键信息。
  • b.由于交换器之前已经通过2个不同的BindingKey绑定键分别绑定了两个队列,因此交换器可以对比路由键和绑定键,之后将消息路由到匹配的队列中。
  • c.队列将消息推送到指定的一个或多个消费者中,多个消费者会选择最简单的RoundRobin轮训方式进行选择。

Exchange交换器

核心概念,可以简化理解为路由器,其不存储数据,其通常会和一个队列绑定,但也可以绑定到另一个交换器上。其包括4种类型的交换器类型,生产实践中主要使用可以精细管理的direct和topic两种。direct,路由规则为完全匹配;topic,支持完全匹配,也支持模糊匹配;fanout,会将消息转发到该交换器绑定的所有队列中;header,实际中无应用。

Queue队列

用于存储消息,和Kafka的消息模型完全不同,其会将消息存储在Topic中。因此在实现类似ConsumerGroup概念时差异很大,Kafka是可以回溯消息的,但Rabbit新绑定的队列的数据是空的,不能回溯。

Binding绑定

其通过绑定键将交换器和队列关联起来

RouteKey & BindingKey 路由键和绑定键

通常会将路由键和绑定键都称为路由键,其差异是路由键是包含在消息标识中的,而绑定键是用于在交换器和队列间建立绑定关系的,消息会通过它们的匹配情况进行路由。

1.2.通信

通信实体

包括Connection连接和Channel通道,连接通常对应一个基于TCP的Socket,建立Connection的关键参数包括用户名、密码、虚拟主机、主机地址和端口。一个连接可以建立多个Channel实例,推荐控制数量(比如10个),但Channel实例不能在线程间共享,应用程序需要为每一个线程开辟一个Channel。

AMQP协议

Java技术栈汇中,关于消息通信听到比较多的是JMS,而AMQP协议相对更加严格一些,其包括Module Layer,Session Layer, Transport Layer三个层次,业务开发主要接触到的是Module Layer,客户端可以通过Queue.Declare、Basic.Consume等命令进行操作。

1.3.虚拟主机与用户

vhost

虚拟主机,可以在逻辑上看做一台RabbitMQ服务器,其拥有自己的交换器、队列和绑定关系等。RabbitMQ对权限的管理就是基于vhost进行的,默认会创建一个全局的/虚拟主机,通常不推荐直接使用该vhost,而是需要自定义一个vhost便于管理。

User 对于某一个用户,通常包括3种类型的权限:read,允许读取队列数据;write,允许向队列发送数据;config,允许创建队列,如果客户端需要支持添加队列,需要添加该权限,否则会报无权限错误【踩过坑】。

2.进阶概念

2.1.交换器与队列增强

TTL过期时间

目前在两个不同的粒度设置消息的TTL,分别是队列粒度和消息粒度。由于RabbitMQ实际机制的原因,通常都选择的是队列粒度,对于队列粒度来说,队列头的消息一定是最先失效的,因此可以高效的判断和丢弃。而对于消息粒度,其需要在消息真正投递到消费者时进行判断,如果该消息之前的消息并没有失效,那么它将一直存活。

死信交换器DLX

全称为 Dead-Letter-Exchange,也是RabbitMQ扩展开发的核心概念,当一条消息在一个队列中变成死信之后,它能自动的被转发到一个交换器中,这个交换器就是DLX,很多地方称和这个交换器绑定的队列是死信队列, 我并不是完全认同。

消息变为死信的原因

消息被拒绝Nack,Reject,并且requeue参数为false(重点强调一下,生产实践中通常不能打开requeue,因为打开后队列中的消息就会出现乱序的情况,且性能很差);消息过期;队列达到最大长度。 DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定 ,实际上就是设置某个队列的属性。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "dlx_exchange");
args.put("x-dead-letter-routing-key" , "dlx-routing-key");
channel.exchangeDeclare("dlx_exchange" , "direct"); //创建 DLX: 
channel.queueDeclare("normal_queue", false, false, false, args); //为队列normal_queue添加 DLX

命名规范:队列类型,[生产者.消费者.队列名后缀];Topic类型,[生产者.exchange.队列名后缀]

延迟队列

延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。使用延迟消息场景如在订单系统中,希望用户下单后30分钟内支付,否则取消订单。那么业务系统可以在下单后,发送延迟消息,到达指定时间后消费该消息来判断是否支持。该方式在数据量比较大的场景中比通过Job扫描数据表合适。

在AMQP协议中,或者RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过前面 所介绍的DLX和TTL模拟出延迟队列的功能,这部分在实践与思考部分进行介绍。

持久化

交换器和队列元数据持久化和消息的持久化,消息的持久化可以直接使用MessageProperties.PERSISTENT_TEXT_PLAIN

2.2.生产者

生产者客户端的代码比较简洁,如下所示。

byte[] messageBodyBytes = "Hello , Xionger!".getBytes();
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,
        messageBodyBytes);

从高可用HA的角度不经要问,消息的生产者将消息发送出去之后,Broker是否收到消息。RabbitMQ针对这个问题提供了两种解决方案,分别是事务机制和发送方确认PublisherConfirm。发送者确认的实现继续细分为3种形式,包括单条同步、批量同步和异步方式。事务机制和单条同步确认方式的性能都比较差,通常只能达到2000QPS左右,因此通常推荐使用发送方确认的批量方式和异步方式,其QPS可以达到8000QPS以上。其中批量方式也存在一个隐患,即发送一批消息到服务端时,如果有一条消息失败,那么该批次所有消息都需要重试。因此目前生产实践中 ,使用的是异步方式,简化的代码实践如下所示。

SortedSet confirmSet = Sets.newTreeSet();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        if (multiple) {
            confirmSet.headSet(deliveryTag - 1);
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        //omit
        //消息重新投递处理
    }
});

tip: 这部分在服务端ack时有一个优化,只会回传当前最大的标识,可以有效减少比对次数。

2.3.消费者

消费模式:拉模式,推模式,RabbitMQ推荐推模式,保持消息消费的有序性。

boolean autoAck = false;
channel.basicQos(64);//prefetchCount
channel.basicConsume(queueName, autoAck, "myConsumerTag",
        new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String routingKey = envelope.getRoutingKey();
                String contentType = properties.getContentType();
                long deliveryTag = envelope.getDeliveryTag();
                channel.basicAck(deliveryTag, false);
            }
        });

tip:

对于消息生产者,过去还有一个消息投递不可达被返回的概念,涉及mandatory和immediate两个参数,但其在生产实践中并不常用。

3.实践与思考

3.1.环境搭建

安装:Mac环境 brew install rabbitmq,非常简便

管理界面

  • unacked: 消费端没有Ack的数量
  • Publish: 推送消息的QPS
  • Deliver(manual ack): 手动Ack
  • durable: 持久化
  • Policy: 队列的规则
  • Mirrors: 镜像Broker

3.2.Client组件开发

在介绍了RabbitMQ主要知识后,扩展的分享一个简易的基于RabbitMQ消息中间件的思路。由于RabbitMQ是基于Erlang开发,虽然很棒但毕竟比较小众,Java技术栈的工程师一般很难去修改RabbitMQ的源码,因此通常只是通过构建一个合理的客户端SDK来支持业务开发。

生产者

生产者目标比较简单,需要实现健壮性强的的发送者确认机制【异步】和支持队列分片,队列分片可以给队列加上后缀标识,然后轮训处理即可。

消费者

消费者部分希望支持消费失败的重试机制、死信队列及其报警机制,以支持3次重试消费为例,整体思路如下图所示。【借助之前介绍的TTL和DLX】

3.3.场景思考

RabbitMQ最大的特点是成熟度高,管理功能全面,近似开箱即用,二次开发实现一个简单靠谱的客户端就足以满足大部分的场景,尤其对于初创企业、中小企业来说是一个非常棒的选择。

  • 高可用HA:源生支持镜像服务器、同步模型等机制
  • 高吞吐:可以通过队列分片的方式支持大量的QPS,比如单个队列推荐QPS4000以下,健康的水位在2000左右,那么就需要通过二次开发客户端来实现队列分片。

参考资料

[1]朱忠华.RabbitMQ实战指南[M].电子工业出版社:北京,2017.11:.

下期预告:深入理解MySQL索引机制 善良比聪明更重要--张小龙

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券