前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ学习 (二)---多消费者工作时的消息处理

RabbitMQ学习 (二)---多消费者工作时的消息处理

作者头像
haoming1100
发布2018-04-28 15:40:25
2.1K0
发布2018-04-28 15:40:25
举报
文章被收录于专栏:步履前行步履前行

ACK

在上一篇中,我们尝试安装并且运行了一个一对一的MQ,这一篇中,我们来看下多消费者和持久化相关的问题!

在我们的应用中,应用通常部署多个服务(当然,你部署一台我也没办法,/表情包),因为即使我们的一台机器挂掉了,还有其他的机器提供着支持。所以应用到MQ的场景中,比如我们有N台生产者,然后有C1、C2 两台消费者,P生产消息到队列,然后C1 、C2进行消费(这里之所以会提到多消费者,是因为如果我们只有一台消费者的话,队列中的消息太多的话,消费者只能一直在处理消息,直到全部处理完,这样如果这台消费者还有其他要处理的业务的话,只能和处理消息的业务线程进行竞争,造成业务的处理不及时)。

在消费者处理消息的时候会有处理时间,我们前面使用的代码一旦向消费者发送消息,队列就会标记为立即删除,此时,一旦消费者突然挂掉,我们就失去了要处理的消息,但是我们肯定不想失去任何消息,如果C1消费者挂掉,我们就转给C2消费者进行处理。

RobbitMQ支持消息确认。消费者返回ACK,通知队列已经成功的处理消息,可以进行操 作,这样就避免了消息的执行失败,被队列删除。即如果消费者没有返回ACK,那么队列将把同样的消息发送给其他的消费者,确保消息的执行。

接口 中表示了如果时true,则时一次性消息,如果false,则是确认的消息。

所以我们消费者的代码只要改动一下即可

持久性

我们已经确认了消息的执行返回,但是这样只是在消费者中的保证,如果时RabbitMQ 服务器挂掉的话,我们的消息仍旧会丢失。

因此我们应该将队列的消息标记为持久。即使服务器挂掉了,再重启,仍旧会保留队列中的消息。

为了引入MQ的持久性,我们要在现有的代码中有2个地方的改动。

当然,此时服务器并没有因为这个地方声明了durable是True就持久性了,因为我们的队列名称仍旧是hello_word,RabbitMQ不允许对现有的队列声明不同的配置,可能是一种保护,因此,我们有一个更加简单的办法,就是更换一个新的队列名称。

然后在发布消息消息的时候声明MessageProperties.PERSISTENT_TEXT_PLAIN

这样,即使服务器重新启动了,仍旧会保留现有的消息

注意:

MessageProperties.PERSISTENT_TEXT_PLAIN 告诉MQ Server是一个持久性的消息, 但是消息持久化并不能完全保证消息不会丢失。

 虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受了消息并没有保存它时,仍然有一个短时间窗口。

另外MQ并不是对每个消息都保存到磁盘中,它可能只是保存到缓存中。

所以我们的服务器应该也具有集群的特点才可能最大化的去保存消息。

公平性

还是引入我们上一节的韩信打野问题,假如我们在开局的时候,遇到抢位置,双打野的情况的时候,就会很尴尬。韩信把红buff和蓝buff都打了,阿珂找不到野了,一生气,你不给我野怪打,我就去送或者挂机,那样我们的体验会很差。

因此,我们要有一个公平的办法,就是韩信打红buff,阿珂打蓝buff。更好的带领我们走向胜利。

在RabbitMQ中,我们可以使用channel.basicQos()方法,设置每个消费者需要处理的消息数,比如设置channel.basicQos(1),这样每个消费者只处理一个消息,韩信也只打一个野怪,剩下的另一个野怪给阿珂打,等韩信打完红buff后,再去打其他的野怪。如果只有韩信一个的话,当然所有的野怪都给它。

今天的MQ 我们就介绍到这里,有什么不对的地方请大家积极反馈

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-04-15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档