抢票,商品秒杀
等功能是如何实现的,其实没有多么高大上,看了消息队列就知道了。“消息队列”是在消息的传输过程中保存消息的容器。
个人理解:我把它分成两个词消息
和队列
。当一大批客户端同时产生大量的网络请求(消息
)时候,服务器的承受能力肯定是有一个限制的。这时候要是有个容器,先让这些消息排队就好了,还好有个叫队列
的数据结构,通过有队列属性的容器
排队(先进先出),把消息再传到我们的服务器,压力减小了好多,这个很棒的容器
就是消息队列
这段理解中还包含这个两个概念: 客户端->生产者
服务器->消费者
当有消息队列
出现,生产者
和消费者
是必不可少的两个概念,上面的理解是多个生产者
对应一个消费者
,当然现实开发中还有许多消费者
的情况哦。接下来的文章也会多次提到生产-消费模型
。
控制消费者的处理速度
和生产者可进入消息队列的数量
等来避免峰值问题FIFO
(先进选出),在一些场景数据处理的顺序很重要,比如商品下单顺序等。解耦
,如果我们想增强消息入队和出队的处理频率,很简单,并不需要改变代码中任何内容,可以直接对消息队列修改一些配置即可,比如我们想限制每次发送给消费者的消息条数等。有优势定有它现实的应用场景,文章后面会针对优势讲它们对应的应用场景。
处理日志
的,可以看做是一个日志(消息)系统
一个重要组件,针对性很强。0.8 版本开始支持复制,不支持事务,因此对消息的重复、丢失、错误没有严格的要求。高并发
的特性,毋庸置疑,RabbitMQ 最高,原因是它的实现语言是天生具备高并发高可用的erlang 语言,天生的分布式
优势。说明:本文主要以RabbitMQ讲解,较为常见。个人认为这几种消息队列中间件能实现的功能,通过 redis 也都能实现,思想。
直接通过 HomeBrew 安装,执行以下命令
brew install rabbitmq
进入安装目录
$ /usr/local/Cellar/rabbitmq/3.7.8
启动
$ sbin/rabbitmq-server
浏览器输入 http://localhost:15672/#/ 默认用户名密码 guest
可视化界面可模块功能介绍:
其他系统安装请自行网上搜索
5672:通信默认端口号 15672:管理控制台默认端口号 25672:集群通信端口号 注意: 阿里云 ECS 服务器如果出现 RabbitMQ 安装成功,外网不能访问是因为安全组的问题没有开放端口 解决方案
以下列举一些在终端常用的操作命令
注意:以上终端所有命令,需要进入到rabbitmqctl的sbin目录下执行rabbitmqctl命令才有用,否则会报错:
画一张基本的图,HelloWorld 消息队列的图片,把下面几个概念都画进去。
看这段代码前先说几个概念
推荐一个 npm 模块amqplib
。
Github: https://github.com/squaremo/amqp.node
$ npm install amqplib
const amqp =require('amqplib');
async function product(params) {
// 1. 创建链接对象
const connection = await amqp.connect('amqp://localhost:5672');
// 2. 获取通道
const channel = await connection.createChannel();
// 3. 声明参数
const routingKey = 'helloKoalaQueue';
const msg = 'hello koala';
for (let i=0; i<10000; i++) {
// 4. 发送消息
await channel.publish('', routingKey, Buffer.from(`${msg} 第${i}条消息`));
}
// 5. 关闭通道
await channel.close();
// 6. 关闭连接
await connect.close();
}
product();
执行 node product.js
代码注释中已经把基本的流程讲解了,但是我刚开始看的时候还有疑问,我想很多小伙伴也会有疑问,说明下:
生产者
发消息的时候必须要指定一个 exchange,若不指定 exchange(为空)会默认指向 AMQP default 交换机,AMQP default 路由规则是根据 routingKey 和 mq 上有没有相同名字的队列进行匹配路由。上面这段代码就是默认指定的交换机。不同类型交换机详细讲解请往下看。解耦
的特性node product.js
生产者代码,消息会堆积到交换机exchange
中,并不会覆盖,如果已执行过消费者并且确认了对应的消息队列
,消息会从exchange交换机
发送到消息队列
,并存入到消息队列
,等待消费者
消费
// 构建消费者
const amqp = require('amqplib');
async function consumer() {
// 1. 创建链接对象
const connection = await amqp.connect('amqp://localhost:5672');
// 2. 获取通道
const channel = await connection.createChannel();
// 3. 声明参数
const queueName = 'helloKoalaQueue';
// 4. 声明队列,交换机默认为 AMQP default
await channel.assertQueue(queueName);
// 5. 消费
await channel.consume(queueName, msg => {
console.log('Consumer:', msg.content.toString());
channel.ack(msg);
});
}
consumer();
执行 node consumer.js
helloKoalaQueueHaHa
,这时候去看Rabbitmq可视化界面中,队列模块,创建了这个队列
看到这里再次证明了消息队列优秀的解耦特性
,消费者和生产者模型
之间没有任何联系,再次创建这个helloKoalaQueueHaHa
路由名称的生产者,消费者
也会正常消费,并且会打印消息,大家可以实际操作试一下。
helloKoalaQueueHaHa
,这时候去看Rabbitmq可视化界面中,队列模块,创建了这个队列
看到这里又再次证明了消息队列优秀的解耦特性
,消费者和生产者模型
之间没有任何联系,再次创建这个helloKoalaQueueHaHa
路由名称的生产者,消费者
也会正常消费,并且会打印消息,大家可以实际操作试一下。
弊端:这样只能一个队列一个队列的删除,如果队列中的消息过多就会特别慢。
先记住一句话
生产者发消息的时候必须指定一个 exchange,否则消息无法直接到达消息队列,Exchange将消息路由到一个或多个Queue中(或者丢弃)
然后开始本章节交换机的讲解
若不指定 exchange(为空)会默认指向 AMQP default 交换机,AMQP default 路由规则是根据 routingKey 和 mq 上有没有相同名字的队列进行匹配路由。
常用的四种类型
不管是哪一种类型的交换机,都有一个绑定binding的操作,只不过根据不同的交换机类型有不同的路由绑定策略。不同类型做的下图红色框框中的事。
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,不需要设置路由键。
上图中,上图中,生产者(Producter)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(consumer1与consumer2)消费。
说明:所有消息都会路由到两个Queue中,是两个消费者都可以收到全部的完全相同的消息吗? 答案是的,两个消费者收到的队列消息正常应该是完全相同的。这种类型常用于广播类型的需求,或者也可以消费者1记录日志 ,消费者2打印日志
对应代码实现:
生产者:
const amqp = require('amqplib');
async function producer() {
// 创建链接对象
const connection = await amqp.connect('amqp://localhost:5672');
// 获取通道
const channel = await connection.createChannel();
// 声明参数
const exchangeName = 'fanout_koala_exchange';
const routingKey = '';
const msg = 'hello koala';
// 交换机
await channel.assertExchange(exchangeName, 'fanout', {
durable: true,
});
// 发送消息
await channel.publish(exchangeName, routingKey, Buffer.from(msg));
// 关闭链接
await channel.close();
await connection.close();
}
producer();
消费者:
const amqp = require('amqplib');
async function consumer() {
// 创建链接对象
const connection = await amqp.connect('amqp://localhost:5672');
// 获取通道
const channel = await connection.createChannel();
// 声明参数
const exchangeName = 'fanout_koala_exchange';
const queueName = 'fanout_kaola_queue';
const routingKey = '';
// 声明一个交换机
await channel.assertExchange(exchangeName, 'fanout', { durable: true });
// 声明一个队列
await channel.assertQueue(queueName);
// 绑定关系(队列、交换机、路由键)
await channel.bindQueue(queueName, exchangeName, routingKey);
// 消费
await channel.consume(queueName, msg => {
console.log('Consumer:', msg.content.toString());
channel.ack(msg);
});
console.log('消费端启动成功!');
}
consumer();
注意:其他类型代码已经放到 github,地址:https://github.com/koala-coding/simple_rabbitmq 欢迎 star 交流。
direct 把消息路由到那些 binding key与 routing key 完全匹配的 Queue中。
以上图的配置为例,我们以 routingKey=”error” 发送消息到Exchange,则消息会路由到 amq1 和 amq2;如果我们以 routingKey=”info” 或 routingKey=”warning” 来发送消息,则消息只会路由到 Queue2。如果我们以其他 routingKey 发送消息,则消息不会路由到这两个 Queue 中。
生产者指定 RoutingKey 消息根据消费端指定的队列通过模糊匹配的方式进行相应转发,两种通配符模式: #:可匹配一个或多个关键字 *:只能匹配一个关键字
header exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的 header 数据。 主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值 header Exchange 类型用的比较少,可以自行 google 了解。
(本小段内容来源网上,参考文章说明)
RPC 远程调用服务端的方法,使用 MQ 可以实现 RPC 的异步调用,基于 Direct 交换机实现
(注意,这里只是提一下 RPC 这个知识,因为单单一个RPC一篇文章都不一定说说完,有兴趣的可以用队列尝试一下RPC)
消息队列
是存在内存中的,如果出现问题挂掉,消息队列中的消息会丢失。所以对于一些需求非常有持久化的必要!RabbitMQ 可以开启持久化。不同开发语言都可以设置持久化参数。
这里以Node.js为例子,其他语言可以自行搜索
await channel.assertExchange(exchangeName, 'direct', { durable: true });
// 注意其中的{ durable: true },这事对交换机持久化,还有其他的几种持久化方式
同时推荐一篇不错的写持久化的文章: https://juejin.im/post/5d6f6b0ae51d45621512add0
消息应答简单的解释就是消费者
完成了消费后,通知一下消息队列。
我觉得这个配置是有必要打开的,消费者完成消息队列中的任务,消费者可能中途失败或者挂掉,一旦 RabbitMQ 发送一个消息给消费者然后便迅速将该消息从消息队列内存
中移除,这种情况下,消费者对应工作进程失败或者挂掉后,那该进程正在处理的消息也将丢失。而且,也将丢失所有发送给该进程的未被处理的消息。
为了确保消息永不丢失,RabbitMQ 支持消息应答机制。当消息被接受,处理之后一条应答便会从消费者回传至发送方,然后RabbitMQ将其删除。
如果某个消费者挂掉(信道、链接关闭或者 tcp 链接丢失)且没有发送 ack 应答,RabbitMQ 会认为该消息没有被处理完全然后会将其重新放置到队列中。通过这种方式你就可以确保消息永不丢失,甚至某个工作进程偶然挂掉的情况。
默认情况下消息应答是关闭的。是时候使用 false(auto-ack配置项)参数将其开启了
这里以 Node.js 为例子,其他语言可以自行搜索
// 消费者消费时候的代码
await channel.consume(queueName, msg => {
console.log('koala:', msg.content.toString());
//... 这里可以放业务逻辑处理的代码,消费者完成后发送回执应答
channel.ack(msg);// 消息应答
}, { noAck: false });
可以将prefetch count
项的值配置为1,这将会指示 RabbitMQ 在同一时间不要发送超过一条消息给每个消费者。换句话说,直到消息被处理和应答之前都不会发送给该消费者任何消息。取而代之的是,它将会发送消息至下一个比较闲的消费者或工作进程。
这里以 Node.js 为例子,amqplib 库对于限流实现提供的接口方法 prefetch。
prefetch 参数说明:
// 创建消费者的时候 限流参数设置
await channel.prefetch(1, false);
如果一个生产者,两个消费者,发放消息,我想要的队列先给消费者1发,发完消费者1发消费者2,这样有顺序的交互发送,应该现在哪一种交换机呢?注意是交互,看完之后想一下?还有消费者完成后有没有手动回调消息队列完成的必要?消息持久化有必要没,持久化有什么好处?
(看完消息队列的消息传递,你会有疑问管道中的消息(生产者)是怎么被消费者消费的 放入队列,然后从队列被取出)
这里在生成订单时候,不需要直接操作数据库 IO ,预扣库存。先扣除了库存,保证不超卖,然后异步生成用户订单,这里用到一次即时消费队列
,这样响应给用户的速度就会快很多;而且还要保证不少卖,用户拿到了订单,不支付怎么办?我们都知道现在订单都有有效期,再使用一个消息队列
,用于判断订单支付超时,比如说用户五分钟内不支付,订单就失效了,订单一旦失效,就会加入新的库存。这也是现在很多网上零售企业保证商品不少卖采用的方案。订单量比较少的情况下,生成订单非常快,用户几乎不用排队。
消息队列
中然后马上返回用户结果,由消息队列异步
的进行这些操作。
假如有大量的用户注册,发生了高并发:
邮件接口承受不住,或是分析信息时的大量计算使 cpu 满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。直接开启服务,装个 RabbitMQ,挺有意思的,就算一个 HelloWorld 也能尝试出很多内容。而且本文说的很多内容都可以用 redis 来实现,也可以去看下我的 redis 文章。顺便说一句设计模式和数据结构是两个好东西,越来越能感觉到。文章代码地址:https://github.com/koala-coding/simple_rabbitmq
https://www.cnblogs.com/baidawei/p/9172433.html
https://www.sojson.com/blog/48.html
http://www.imooc.com/article/293742
https://www.zhihu.com/question/34243607/answer/58314162
https://bbs.csdn.net/topics/392169691?page=1
https://mp.weixin.qq.com/s/wTkwJXlNr5CaI7uRntJ42A