前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go之一步步学习RabbitMQ(一)

Go之一步步学习RabbitMQ(一)

作者头像
灰子学技术
发布2023-10-30 15:44:28
1200
发布2023-10-30 15:44:28
举报
文章被收录于专栏:灰子学技术灰子学技术

写在前面的话:最近笔者在学习RabbitMQ,便尝试着通过下面的学习过程,来尽量还原RabbitMQ为什么如此设计,以及它是如何解决这些问题的。当中如有不对或者理解偏差的地方,还请大家不吝赐教,多多留言。如果你觉得这篇文章真的帮到了你,还请你顺手转发下。


背景知识:

在学习RabbitMQ之前,我们需要对下面的知识有些概念,

生产者(producer):产生并发送消息的程序。

队列(queue):存在RabbitMQ中的邮筒,虽然消息是在应用程序和RabbitMQ中进行传递,但队列才是唯一能够存储消息的地方。队列的大小取决于宿主机器的内存和磁盘容量,它本质上是一个巨大的消息缓存池。多个生产者可以发送消息给同一个队列,多个消费者也可以从同一个队列中读取消息。这个队列有一个特点,先进先出。

消费者(consuming):等待接收消息的程序。

参考知识: 消息队列基础知识,还请参考笔者的另外两篇文章: https://mp.weixin.qq.com/s/uFL6a52FwAAneSJ4GniP5Q https://mp.weixin.qq.com/s/F0DbjgavwH3MUmPlRc9sDg Go 语言中与RabbitMQ交互的客户端包go-amqp: https://mp.weixin.qq.com/s/ALjCxEGrNOBjGV7vn5_SHQ


问题一:RabbitMQ如何解决生产者生产过快,消费者消费过慢的问题?

在看这个问题之前,我们先看下这个问题:网络中,如果一个机器(producer)想把数据发送给另外一台机器(consumer),那么它应该怎么做?

答案是:它们之间需要建立一个连接,如下图所示,这样貌似就解决了生产者与消费者之间传递数据的问题。不过这样以来producer与consumer之间就绑定了,这个连接也要一直存在,要不然它们之间就没有办法通讯。

如果它们都很闲,或者它们的处理速度差不多(备注:生产者生产数据的速度和消费者消费数据的速度相当)的情况下,这都不是问题。

可是,一旦生产者生产数据过快,或者消费者消费数据过慢,这样就会出问题,生产者产生的数据没有办法被及时处理完。这样就会导致这些数据被丢弃掉,或者生产者只能暂时停止继续生产数据,但是生产者又被绑死在这个消费者上面,也没有办法去干别的事情。

要解决上面的问题,我们该怎么办呢?一般有两种办法:

方法一:新增消费者并让生产者与它再建立连接,然后生产者自己决策如何给这么多的消费者分配数据。这样的话会有两个结果:

第一,生产者需要与另外一个消费者再建立一条连接。第二,生产者需要自己添加数据分发策略,这样会导致生产者的逻辑变得复杂了很多。

方法二:将生产者产生的数据放到缓存中(也就是消息队列中),而消费者也从这个缓存中获取数据,如下图所示,这也是RabbitMQ的实现方式。这样的话,会有两个好处:

第一,生产者不需要与消费者绑定,它们只需要与消息队列绑定就好了,生产者和消费者成功完成解耦操作。第二,生产者和消费者的速度,可以不一致,就算生产者很快,消费者很慢也没有问题,只要它们能够保证消息队列不满的话,消费者就可以慢慢处理,生产者可以不停的去生产数据。

下面我们来看一下go-amqp例子,是如何实现的这一步操作:

左边是生产者的核心代码部分,右边是消费者的核心代码部分。

运行的时候,我们需要按照下面的步骤来操作:

首先,启动rabbitmq服务器

$ rabbitmq-server

........

Starting broker... completed with 6 plugins // 表示启动成功

其次,启动消费者

$ ./receive

最后,启动生产者,分别发送三次数据给rabbitmq-server

$ ./send hello 2019/11/03 16:32:45 [x] Sent hello

$ ./send world 2019/11/03 16:32:53 [x] Sent world

$ ./send I love U 2019/11/03 16:33:13 [x] Sent I love U

说明:通过上面的消费者的输出,我们可以看出,生产者每生产一个数据,消费者都会立即取走一个数据进行处理。

问题二:RabbitMQ如何解决多个消费者调度的问题?

当一个消费者怎么都处理不过来的时候,最终还是应该新增消费者来处理,如下图所示。在新增消费者的时候后,RabbitMQ的优势就体现出来了,新增消费者的时候,消费者只是与消息队列建立了新的连接,并且也不会增加生产者的代码复杂度。

不过这样也带来了一个新的问题:消息队列怎么决定,同一时刻哪一个消费者来消费这个消息?

RabbitMQ最简单的方式就是时间轮询策略,也就是保证队列先进先出,本时刻哪一个消费者来消息数据,就给到哪一个消费者。

下面是多个消费者调度的展示例子, 我们启动两个消费者,一个生产者,如下图所示: 消费者一:$ ./receive

消费者二:$ ./receive

生产者:

备注:通过消费者一和二输出的结果来看,对于生产者生产的数据,两个消费者按照时间顺序,依次轮询输出。

问题三:RabbitMQ如何保证消息队列中的数据,确实被消费者已经处理掉了?

在真实的网络中,网络往往不可靠。也就是说有可能会存在消息被消费者拿走之后,因为网络原因导致消息并没有真正发送到消费者。

RabbitMQ采用的是消息确认机制,也就是消费者取走消息之后,在处理完了这个消息,需要要主动发送ACK给消息队列,消息队列在收到这个ACK之后,才可以删除这个消息。例子如下所示:

消息确认机制的代码,只要是在对消费者设置的时候,auto-ack设置成false,也就是需要消费者主动回复Ack。 1. 消费者主动回复ACK的过程,与上面的例子类似,并无特别之处。 2. 消费者不回复ACK消息,会发现生产者发送的消息,一直在rabbitmq-server上面保留着,只要有消费者启动,就会将这些数据再消费一次。

生产者发送的消息内容:

消费者消费了第一和第二次数据:

不过,这也要求消息队列必须在ACK回来的这段时间内保证不删除,可是如果ACK一直不来呢?

这样就会导致这个消息一直放在消息队列中不被处理,进而导致RabbitMQ上面的内存泄漏。

我们先来看下消息丢失的场景,一般有三种:第一种,消费者真的就没有回复;第二种,消费者回复了,但是网络原因给丢弃了;第三种,网络断开或者连接关掉。第一种和第三种最为常见,第二种,其实并不是很常见。

第一种情况,其实是消费者那端的代码问题,需要消费者修复才行。

第二种情况,往往是因为使用的底层通讯库有bug导致的,因为在连接不断开的前提下,只要消息发出去了,TCP协议会保证消息到达对端的。RabbitMQ并没有对这种场景做处理,因为RabbitMQ并不知道,这个消息是消费者丢失了,还是网络丢失了,当然了它也不应该关心这也业务场景。不过在设计的时候,我们到可以让消费者根据自身业务,添加超时处理机制,例如:消费者在长时间得不到RabbitMQ新的消息的时候,可以尝试去重发上一个消息的ACK消息。

第三种情况,RabbitMQ在监测到网络断开或者连接关掉的时候,会主动将这个消息再一次放回到消息队列里面,让后续消费者可以再取一次消息。

问题四:RabbitMQ如何保证消费者处理的公平性?

上面讨论的消息内容都是相同或者相似大小的情况下,一旦者消息的大小不同,在RabbitMQ的轮询策略下,就很有可能导致大任务的消息被分配给同一个消费者,导致这个消费者很忙,而其他的消费者却比较闲。

基于这个问题,RabbitMQ采用公平策略做了处理,大体就是在消费者没有将分配到的消息处理完的时候,不在分配新的消息给他,这样就能够让闲一点的消费者去消息队列继续拿新的消息,而忙的消费者一心一意的处理拿到的这个大任务消息。例子如下所示:

代码主要涉及在消费者里,新增的ch.Qos中prefetch count的设定,我们这里设定的数值是1,也就是当消费者拿走数据之后,一直没有回复ACK给rabbitmq-server,那么rabbitmq-server就一直不在给这个消费者分配新的消息。

消费者一,拿到了一个处理时间比较久的数据,所以一直在处理这个消息。

消费者二,拿到了比较短的数据,所以可以很快的处理完,便可以很快的分配到别的数据。

问题五:一旦RabbitMQ挂掉了,该怎么办呢?

基于这个问题,RabbitMQ也做了处理,叫做消息持久化,在RabbitMQ挂掉之前的那些消息队列中的消息,它都会存到硬盘里面,等到RabbitMQ重启之后,会将这些数据重新恢复出来。

当然,对于生产者已经发送,却没有收到确认的消息,需要生产者单独做异常处理。

这一部分操作代码里面主要是一个消息持久化flag的设定,生产者和消费者里面都需要设置,对于效果的展示这里就不做介绍了。

总结:

本文只是对rabbitmq的基本使用,碰到的问题以及解决方法做了详解和举例说明,希望对你有所帮助。对于rabbitmq的路由部分,是另外一类内容,笔者会在后面一篇给出

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 灰子学技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景知识:
  • 问题一:RabbitMQ如何解决生产者生产过快,消费者消费过慢的问题?
  • 问题二:RabbitMQ如何解决多个消费者调度的问题?
  • 问题三:RabbitMQ如何保证消息队列中的数据,确实被消费者已经处理掉了?
  • 问题四:RabbitMQ如何保证消费者处理的公平性?
  • 问题五:一旦RabbitMQ挂掉了,该怎么办呢?
  • 总结:
相关产品与服务
消息队列
腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档