前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMq消费消息

RabbitMq消费消息

作者头像
写一点笔记
发布2022-08-11 15:53:22
1.2K0
发布2022-08-11 15:53:22
举报
文章被收录于专栏:程序员备忘录程序员备忘录

rabbitmq的消息消费有两种方式,推模式和拉模式。推模式采用basic.consume进行消费,而拉模式则是调用的basic.Get进行消费。

推模式: 1:推模式接收消息是最有效的一种消息处理方式。channel.basicConsume(queneName,consumer)方法将信道(channel)设置成投递模式,直到取消队列的订阅为止;在投递模式期间,当消息到达RabbitMQ时,RabbitMQ会自动地、不断地投递消息给匹配的消费者,而不需要消费端手动来拉取,当然投递消息的个数还是会受到channel.basicQos的限制。

2:推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。优点是消费者总是有一堆在内存中待处理的消息,所以当真正去消费消息时效率很高。缺点就是缓冲区可能会溢出。3:由于推模式是信息到达RabbitMQ后,就会立即被投递给匹配的消费者,所以实时性非常好,消费者能及时得到最新的消息。

拉模式:1:如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方法来进行消费消息。2:拉模式在消费者需要时才去消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。3:由于拉模式需要消费者手动去RabbitMQ中拉取消息,所以实时性较差;消费者难以获取实时消息,具体什么时候能拿到新消息完全取决于消费者什么时候去拉取消息。

结论 1:不能在循环中使用拉模式来模拟推模式,因为拉模式每次都需要去消息中间件中拉取消息来消费,所以会严重影响RabbitMQ性能。 2:要想实现高吞吐量,消费者需要使用推模式。 不言不语技术 https://www.cnblogs.com/hzcya1995/p/13302427.html

1.推模式

在推模式中,可以通过持续订阅的方式来消费消息,使用到的相关类有:

代码语言:javascript
复制
import com.rabbitmq.client.consumer;
import com.rabbitmq.client.defaultconsumer;

接收消息一般通过实现consumer接口或者继承defaultconsumer类实现,当调用与consumer相关的api方法时,不同的订阅采用不同的消费者标签consumerTag来区分彼此,在同一个channel中的消费者也需要通过唯一的消费者标签做区分,关键消费代码如下:

代码语言:javascript
复制
boolean autoAck=false;
channel.basicQos(64);
channel.basicConsume(queueName,autoAck,consumerTag,new DefaultConsume(channel){
public void handleDelivery(String consumerTag,Envelop envelope,AMQP.BasicProperties properties,byte[] body){
String routingKey=envelop.getRoutingKey();
String contentType=properties.getContentType();
//这里做具体的业务,我们开发中使用的注解    @RabbitListener(queues = RabbitConsts.WENKU_SDFILE_KBASE)就是在这里做的,就是AOP。aop很重要的
//这里的false表示不自动ack,为true表示自动ack,但是自动ack会有消息丢失的潜在问题
channel.basicAck(deliveryTag,false);
}
})

channel.basic有很多类似的接口。基本的参数如下:

queue:队列的名称

autoAck:设置是否自动确认,建议设置成false,就是不自动确认

consumerTag:消费者标签,用来区分对个消费者

nolocal:设置为true表示不能将同一个connection中生产者发送的消息传递给这个connection中的消费者。

exclusive:设置是否排他

arguments:设置消费者的其他参数

callBack:设置消费者的回调函数,用来处理rabbitmq推送过来的消息,比如defaultconsumer,使用客户端重写其中的方法。

当然和spring ioc中的bean的初始化一样,channel.basicConsume也有很多事件处理器,比如handleConsumerOk,handleCannelOK,handleCannel,handshutDownSign,handReconverOk等。可以去做一写前置或者后置的判断。

在使用推模式的时候有个参数需要注意:prefetch。这个参数的含义是一次性可以消费多少条消息,如果设置了改参数,消费者会通过队列进行缓存,同事rabbitmq队列中将有消费者数量*prefetch数量的消息没有收到ack,知道rabbitmq中的消息全部被ack之后,才会发送prefetch数量的消息给一个消费者。这个参数设置的过大会导致消费者的缓存队列溢出,或者oom现象。如果有上下游业务关系的还是配置成1吧!

2.拉模式

拉模式通过channel.basicGet方法可以单条的获取消息,其返回值为GetResponse,channel类的basicGet方法没有其他的重载方法,拉模式不要放在一个while循环中,那样消耗会很大,仅有:

代码语言:javascript
复制
GetResponse basicGet(String queue,boolean ack);

queue表示队列名,ack表示是否自动确认。如果设置为false,同样需要调用channel.basicAck确认消息被消费了。

代码语言:javascript
复制
GetResponse response=channel.basicGet(“ttt”,false);
//do something
channel.basicAck(response.getEnvelope().getDeliverTag(),false);
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 写点笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档