前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot:RabbitMQ消息重复消费场景及解决方案

SpringBoot:RabbitMQ消息重复消费场景及解决方案

作者头像
Freedom123
发布2024-03-29 14:46:28
2910
发布2024-03-29 14:46:28
举报
文章被收录于专栏:DevOpsDevOps

简介

首先我们来看一下消息的传输流程。消息生产者–>MQ–>消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。

所以消息重复也就出现在两个阶段:

  • 1、生产者多发送了消息给MQ;
  • 2、MQ的一条消息被消费者消费了多次。

第一种场景很好控制,只要保证消息生成者不重复发送消息给MQ即可。

场景

在保证MQ消息不重复的情况下,消费者消费消息成功后,在给MQ发送消息确认的时候出现了网络异常(或者是服务中断),MQ没有接收到确认,此时MQ不会将发送的消息删除,为了保证消息被消费,当消费者网络稳定后,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。

1、消息发送者发送1万条消息给MQ

代码语言:javascript
复制
@GetMapping("/rabbitmq/sendToClient")
public String sendToClient() {
    String message = "server message sendToClient";
    for (int i = 0; i < 10000; i++) {
        amqpTemplate.convertAndSend("queueName3",message+": "+i);
 
    }
    return message;
}

启动消息发送服务,调用接口发送消息,mq成功收到1万条消息。

2、消费者监听消费消息

代码语言:javascript
复制
@RabbitListener(queues = "queueName3")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage(String message) {
    System.out.println("接收者2--接收到queueName3队列的消息为:"+message);
}

启动消费者服务,然后中断消费服务,此时消费到了第7913个消息:

此时查看MQ的消息,现在MQ队列中应该还有2087个消息,但还有2088个消息,说明最后一个消息被消费了没有被MQ服务确认。

再次启动消费者服务,消息从第7913个消息开始消费,而不是第7914个消息

解决方案

为了保证消息不被重复消费,首先要保证每个消息是唯一的,所以可以给每一个消息携带一个全局唯一的id,流程如下:

  • 1.消费者监听到消息后获取id,先去查询这个id是否存中
  • 2.如果不存在,则正常消费消息,并把消息的id存入 数据库或者redis中(下面的编码示例使用redis)
  • 3.如果存在则丢弃此消息
1.生产者
代码语言:javascript
复制
/**
 * @Description:  发送消息 模拟消息重复消费
 *      消息重复消费情景:消息生产者已把消息发送到mq,消息消费者在消息消费的过程中突然因为网络原因或者其他原因导致消息消费中断
 *      消费者消费成功后,在给MQ确认的时候出现了网络波动,MQ没有接收到确认,
 *      为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息
 * @param:
 * @return: java.lang.String
 * @Author: chenping
 */
@GetMapping("/rabbitmq/sendMsgNoRepeat")
public String sendMsgNoRepeat() {
    String message = "server message sendMsgNoRepeat";
    for (int i = 0; i <10000 ; i++) {
        Message msg = MessageBuilder.withBody((message+"--"+i).getBytes()).setMessageId(UUID.randomUUID()+"").build();
        amqpTemplate.convertAndSend("queueName4",msg);
    }
    return message;
}
2.消费者方案1:将id存入string中(单消费者场景):

这样一个队列,redis数据只有一条,每次消息过来都覆盖之前的消息,但是消费者多的情况不适用,可能会存在问题–一个消息被多个消费者消费

代码语言:javascript
复制
@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage(Message message) throws UnsupportedEncodingException {
    String messageId = message.getMessageProperties().getMessageId();
    String msg = new String(message.getBody(),"utf-8");
 
    String messageRedisValue = redisUtil.get("queueName4","");
    if (messageRedisValue.equals(messageId)) {
        return;
    }
    System.out.println("消息:"+msg+", id:"+messageId);
 
    redisUtil.set("queueName4",messageId);//以队列为key,id为value
}
3.消费者方案2:将id存入list中(多消费者场景)

这个方案可以解决多消费者的问题,但是随着mq的消息增加,redis数据越来越多,需要去清除redis数据。

代码语言:javascript
复制
@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage1(Message message) throws UnsupportedEncodingException {
    String messageId = message.getMessageProperties().getMessageId();
    String msg = new String(message.getBody(),"utf-8");
 
    List<String> messageRedisValue = redisUtil.lrange("queueName4");
    if (messageRedisValue.contains(messageId)) {
        return;
    }
    System.out.println("消息:"+msg+", id:"+messageId);
 
    redisUtil.lpush("queueName4",messageId);//存入list
}
4.消费者方案3:将id以key值增量存入string中并设置过期时间:

以消息id为key,消息内容为value存入string中,设置过期时间(可承受的redis服务器异常时间,比如设置过期时间为10分钟,如果redis服务器断了20分钟,那么未消费的数据都会丢了)

代码语言:javascript
复制
@RabbitListener(queues = "queueName4")//发送的队列名称     @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage2(Message message) throws UnsupportedEncodingException {
    String messageId = message.getMessageProperties().getMessageId();
    String msg = new String(message.getBody(),"utf-8");
 
    String messageRedisValue = redisUtil.get(messageId,"");
    if (msg.equals(messageRedisValue)) {
        return;
    }
    System.out.println("消息:"+msg+", id:"+messageId);
 
    redisUtil.set(messageId,msg,10L);//以id为key,消息内容为value,过期时间10分钟
}

测试

首先,启动消息生成服务,发送一万条消息:

启动消息消费服务,然后中断服务,消费了1934条消息:

未被消费的消息条数为8067条,多了一条(10000-1934=8066 ):

再次启动消费者服务,消费者舍弃了已被消费的第1934条消息

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-03-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 场景
  • 解决方案
    • 1.生产者
      • 2.消费者方案1:将id存入string中(单消费者场景):
        • 3.消费者方案2:将id存入list中(多消费者场景)
          • 4.消费者方案3:将id以key值增量存入string中并设置过期时间:
          • 测试
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档