RabbitMQ入门教程

摘要: 使用RabbitMQ的消息队列,可以有效提高系统的峰值处理能力。

RabbitMQ简介

RabbitMQ消息代理(Message Broker),它支持多种异步消息处理方式,最常见的有:

  • Work Queue:将消息缓存到一个队列,默认情况下,多个worker按照Round Robin的方式处理队列中的消息。每个消息只会分配给单个worker。
  • Publish/Subscribe:每个订阅消息的消费者都会收到消息,因此每个消息通常会分配给多个worker,每个worker对消息进行不同的处理。

RabbitMQ还支持RoutingTopics、以及Remote procedure calls (RPC)等方式。

对于不同的消息处理方式,有一点是相同的,RabbitMQ是介于消息的生产者和消费者的中间节点,负责缓存和分发消息。RabbitMQ接收来自生产者的消息,缓存到内存中,按照不同的方式分发给消费者。RabbitMQ还可以将消息写入磁盘,保证持久化,这样即使RabbitMQ意外崩溃了,消息数据不至于完全丢失。

为什么使用RabbitMQ?

最简单的一点在于,它支持Work Queue等不同的消息处理方式,可以用于不同的业务场景。对于我们Fundebug来说,目前只用过RabbitMQ的Work Queue,即消息队列。

使用消息队列,可以将不算紧急、但是非常消耗资源的计算任务,以消息的方式插入到RabbitMQ的队列中,然后使用多个处理模块处理这些消息。

这样做最大的好处在于:提高了系统峰值处理能力。因为,来不及处理的消息缓存在RabbitMQ中,避免了同时进行大量计算导致系统因超负荷运行而崩溃。而那些来不及处理的消息,会在峰值过去之后慢慢处理掉。

另一个好处在于解耦。消息的生产者只需要将消息发送给RabbitMQ,这些消息什么时候处理完,不会影响生产者的响应性能。

广告:欢迎免费试用Fundebug,为您监控线上代码的BUG,提高用户体验~

安装并运行RabbitMQ

使用Docker运行RabbitMQ非常简单,只需要执行一条简单的命令:

sudo docker run -d --name rabbitmq -h rabbitmq -p 5672:5672 -v /var/lib/rabbitmq:/var/lib/rabbitmq registry.docker-cn.com/library/rabbitmq:3.7

对于不熟悉Docker的朋友,我解释一下docker的命令选项:

  • -d : 后台运行容器
  • –name rabbitmq : 将容器的名字设为rabbitmq
  • -h rabbitmq : 将容器的主机名设为rabbitmq,希望RabbitMQ消息数据持久化保存到本地磁盘是需要设置主机名,因为RabbitMQ保存数据的目录为主机名
  • -p 5672:5672 : 将容器的5672端口映射为本地主机的5672端口,这样可以通过本地的5672端口访问rabbitmq
  • -v /var/lib/rabbitmq:/var/lib/rabbitmq:将容器的/var/lib/rabbitmq目录映射为本地主机的/var/lib/rabbitmq目录,这样可以将RabbitMQ消息数据持久化保存到本地磁盘,即使RabbitMQ容器被删除,数据依然还在。

Docker为官方镜像提供了加速服务,因此命令中Rabbit的Docker镜像名为registry.docker-cn.com/library/rabbitmq:3.7

如果你不会Docker,建议你学习一下。如果你不想学,Ubuntu 14.04下安装RabbitMQ的命令是这样的:

sudo echo "deb http://www.rabbitmq.com/debian testing main" | sudo tee -a /etc/apt/sources.listwget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -sudo apt-get updatesudo apt-get install rabbitmq-server

启动RabbitMQ:

sudo service rabbitmq-server start

消息队列代码示例

下面,我们使用Node.js实现一个简单消息队列。

消息的生产者:sender.js

const amqp = require("amqplib");const queue = "demo";async function sendMessage(message){ const connection = await amqp.connect("amqp://localhost"); const channel = await connection.createChannel(); await channel.assertQueue(queue); await channel.sendToQueue(queue, new Buffer(message), { // RabbitMQ关闭时,消息会被保存到磁盘 persistent: true });}setInterval(function(){ sendMessage("Hello, Fundebug!");}, 1000)

  • 在sender中,不断地往消息队列中发送”Hello, Fundebug!”。

消息的消费者:receiver.js

const amqp = require("amqplib");const queue = "demo";async function receiveMessage(){ const connection = await amqp.connect("amqp://localhost"); const channel = await connection.createChannel(); await channel.assertQueue(queue); await channel.consume(queue, function(message) { console.log(message.content.toString()); channel.ack(message); });}receiveMessage();

  • 在receiver中,从消息队列中读出message并打印。

我们用到了amqplib模块,用于与RabbitMQ进行通信,对于具体接口的细节,可以查看文档

在调用sendToQueue时,将persistent属性设为true,这样RabbitMQ关闭时,消息会被保存到磁盘。测试这一点很简单:

  • 关闭receiver
  • 启动sender,发送消息给RabbitMQ
  • 重启RabbitMQ(sudo docker restart rabbitmq)
  • 启动receiver,会发现它可以接收sender在RabbitMQ重启之前发送的消息

由于RabbitMQ容器将保存数据的目录(/var/lib/rabbitmq)以数据卷的形式保存在本地主机,因此即使将RabbitMQ容器删除(sudo docker rm -f rabbitmq)后重新运行,效果也是一样的。

另外,这段代码采用了Node.js最新的异步代码编写方式:Async/Await,因此非常简洁,感兴趣的同学可以了解一下。

这个Demo的运行方式非常简单:

  • 运行RabbitMQ容器

sudo ./start_rabbitmq.sh

  • 发送消息 node ./sender.js
  • 接收消息 node ./receiver.js

在receiver端,可以看到不停地打印”Hello, Fundebug!”。

代码仓库地址为:Fundebug/rabbitmq-demo

自动重连代码示例

在生产环境中,RabbitMQ难免会出现重启的情况,比如更换磁盘或者服务器、负载过高导致崩溃。因为RabbitMQ可以将消息写入磁盘,所以数据是”安全”的。但是,代码中必须实现自动重连机制,否则RabbitMQ停止时会导致Node.js应用崩溃。这里提供一个自动重连的代码示例,给大家参考:

消息生产者:sender_reconnect.js

const amqp = require("amqplib");const queue = "demo";var connection;// 连接RabbitMQasync function connectRabbitMQ(){ try { connection = await amqp.connect("amqp://localhost"); console.info("connect to RabbitMQ success"); const channel = await connection.createChannel(); await channel.assertQueue(queue); await channel.sendToQueue(queue, new Buffer("Hello, Fundebug!"), { // RabbitMQ重启时,消息会被保存到磁盘 persistent: true }); connection.on("error", function(err) { console.log(err); setTimeout(connectRabbitMQ, 10000); }); connection.on("close", function() { console.error("connection to RabbitQM closed!"); setTimeout(connectRabbitMQ, 10000); }); } catch (err) { console.error(err); setTimeout(connectRabbitMQ, 10000); }}connectRabbitMQ();

消息消费者:receiver_reconnect.js

const amqp = require("amqplib");const queue = "demo";var connection;// 连接RabbitMQasync function connectRabbitMQ(){ try { connection = await amqp.connect("amqp://localhost"); console.info("connect to RabbitMQ success"); const channel = await connection.createChannel(); await channel.assertQueue(queue); await channel.consume(queue, async function(message) { console.log(message.content.toString()); channel.ack(message); }); connection.on("error", function(err) { console.log(err); setTimeout(connectRabbitMQ, 10000); }); connection.on("close", function() { console.error("connection to RabbitQM closed!"); setTimeout(connectRabbitMQ, 10000); }); } catch (err) { console.error(err); setTimeout(connectRabbitMQ, 10000); }}connectRabbitMQ();

这样的话,即使RabbitMQ重启,sender和receiver也可以自动重新连接RabbitMQ。如果你希望监控RabbitMQ是否出错,不妨使用我们Fundebug的Node.js错误监控服务,在连接触发”error”或者”close”事件时,第一时间发送报警,这样开发者可以及时定位和处理BUG。

参考

版权声明:
转载时请注明作者Fundebug以及本文地址:
https://blog.fundebug.com/2018/04/20/rabbitmq_tutorial/

您的用户遇到BUG了吗?

体验Demo 免费使用

.copyright * { box-sizing: border-box; }

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏白驹过隙

ACE - Ubuntu下环境搭建

2659
来自专栏pangguoming

整合spring cloud云架构 - SSO单点登录之OAuth2.0登录认证(1)

之前写了很多关于spring cloud的文章,今天我们对OAuth2.0的整合方式做一下笔记,首先我从网上找了一些关于OAuth2.0的一些基础知识点,帮助大...

1266
来自专栏分布式系统和大数据处理

C#编写简单的聊天程序

这是一篇基于Socket进行网络编程的入门文章,我对于网络编程的学习并不够深入,这篇文章是对于自己知识的一个巩固,同时希望能为初学的朋友提供一点参考。文章大体分...

582
来自专栏喔家ArchiSelf

全栈必备 Log日志

Log日志,不论对开发者自身,还是对软件系统乃至产品服务都是非常重要的事情。每个开发者都接触过日志,以至于每个人对日志的了解都会有所不同。

602
来自专栏macOS 开发学习

cocos2d-objc 3.0+ 游戏开发学习手册(一): 简介与安装

目前网络中关于cocos2d-iphone 方面的资料,大部分都是基于c++ 语言跨平台的cocos2d-x,偶尔搜到一些cocos2d方面的也由于版本比较早(...

843
来自专栏人工智能

带你构建你的的第一个Python和Django应用程序

群内不定时分享干货,包括最新的python企业案例学习资料和零基础入门教程,欢迎初学和进阶中的小伙伴入群学习交流 ? 您以前可能听说过Python,特别是如果您...

2125
来自专栏熊二哥

JMeter快速入门

今天的年会已过,仍然是空手而归,不过俺坚信能让生活稳定永远都是努力。由于隔壁组负责年会的抢红包项目,因而趁此机会把通过工具模拟高并发的知识补了补,通过和身边大师...

2225
来自专栏磨磨谈

Ceph用户邮件列表Vol45-Issue1

这个问题是作者一个集群中(ceph 0.94.5)出现了一个磁盘损坏以后造成了一些对象的丢失,然后在做了一定的处理以后,集群状态已经正常了,但是还是新的请求会出...

652
来自专栏Felix的技术分享

用Android Studio调试Framework层代码

2115
来自专栏静晴轩

Android项目中文字乱码问题

Eclipse之所以会出现乱码问题是因为eclipse编辑器选择的编码规则是可变的。一般默认都是UTF-8或者GBK(对于字符编码可参见字符编码的故事),当从外...

36311

扫码关注云+社区